You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2016/07/12 23:43:35 UTC
cassandra git commit: Omit (de)serialization of state variable in UDAs
Repository: cassandra
Updated Branches:
refs/heads/trunk 7751588f7 -> adffb3602
Omit (de)serialization of state variable in UDAs
patch by Robert Stupp; reviewed by Tyler Hobbs for CASSANDRA-9613
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/adffb360
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/adffb360
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/adffb360
Branch: refs/heads/trunk
Commit: adffb3602033273efdbb8b5303c62dbf33c36903
Parents: 7751588
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Jul 13 09:43:12 2016 +1000
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Jul 13 09:43:12 2016 +1000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cql3/functions/JavaBasedUDFunction.java | 49 ++++++++++--
.../cassandra/cql3/functions/JavaUDF.java | 2 +
.../cql3/functions/ScriptBasedUDFunction.java | 25 +++++-
.../cassandra/cql3/functions/UDAggregate.java | 52 ++++++++-----
.../cql3/functions/UDFByteCodeVerifier.java | 7 ++
.../cassandra/cql3/functions/UDFunction.java | 82 ++++++++++++++++++--
.../cassandra/cql3/functions/JavaSourceUDF.txt | 8 ++
.../entities/udfverify/CallClone.java | 5 ++
.../entities/udfverify/CallComDatastax.java | 5 ++
.../entities/udfverify/CallFinalize.java | 5 ++
.../entities/udfverify/CallOrgApache.java | 5 ++
.../entities/udfverify/ClassWithField.java | 5 ++
.../udfverify/ClassWithInitializer.java | 5 ++
.../udfverify/ClassWithInitializer2.java | 5 ++
.../udfverify/ClassWithInitializer3.java | 5 ++
.../entities/udfverify/ClassWithInnerClass.java | 5 ++
.../udfverify/ClassWithInnerClass2.java | 5 ++
.../udfverify/ClassWithStaticInitializer.java | 5 ++
.../udfverify/ClassWithStaticInnerClass.java | 5 ++
.../entities/udfverify/GoodClass.java | 5 ++
.../entities/udfverify/UseOfSynchronized.java | 5 ++
.../udfverify/UseOfSynchronizedWithNotify.java | 5 ++
.../UseOfSynchronizedWithNotifyAll.java | 5 ++
.../udfverify/UseOfSynchronizedWithWait.java | 5 ++
.../udfverify/UseOfSynchronizedWithWaitL.java | 5 ++
.../udfverify/UseOfSynchronizedWithWaitLI.java | 5 ++
.../entities/udfverify/UsingMapEntry.java | 5 ++
.../validation/operations/AggregationTest.java | 59 +++++++++++++-
29 files changed, 349 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6b0a118..df07ba0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.10
+ * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
* Create a system table to expose prepared statements (CASSANDRA-8831)
* Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
* Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java b/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
index 87f5019..34c6cc9 100644
--- a/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
@@ -191,7 +191,7 @@ public final class JavaBasedUDFunction extends UDFunction
// javaParamTypes is just the Java representation for argTypes resp. argDataTypes
TypeToken<?>[] javaParamTypes = UDHelper.typeTokens(argCodecs, calledOnNullInput);
- // javaReturnType is just the Java representation for returnType resp. returnDataType
+ // javaReturnType is just the Java representation for returnType resp. returnTypeCodec
TypeToken<?> javaReturnType = returnCodec.getJavaType();
// put each UDF in a separate package to prevent cross-UDF code access
@@ -222,7 +222,10 @@ public final class JavaBasedUDFunction extends UDFunction
s = body;
break;
case "arguments":
- s = generateArguments(javaParamTypes, argNames);
+ s = generateArguments(javaParamTypes, argNames, false);
+ break;
+ case "arguments_aggregate":
+ s = generateArguments(javaParamTypes, argNames, true);
break;
case "argument_list":
s = generateArgumentList(javaParamTypes, argNames);
@@ -326,7 +329,7 @@ public final class JavaBasedUDFunction extends UDFunction
}
}
- if (nonSyntheticMethodCount != 2 || cls.getDeclaredConstructors().length != 1)
+ if (nonSyntheticMethodCount != 3 || cls.getDeclaredConstructors().length != 1)
throw new InvalidRequestException("Check your source to not define additional Java methods or constructors");
MethodType methodType = MethodType.methodType(void.class)
.appendParameterTypes(TypeCodec.class, TypeCodec[].class, UDFContext.class);
@@ -364,6 +367,10 @@ public final class JavaBasedUDFunction extends UDFunction
return javaUDF.executeImpl(protocolVersion, params);
}
+ protected Object executeAggregateUserDefined(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+ {
+ return javaUDF.executeAggregateImpl(protocolVersion, firstParam, params);
+ }
private static int countNewlines(StringBuilder javaSource)
{
@@ -417,22 +424,48 @@ public final class JavaBasedUDFunction extends UDFunction
return code.toString();
}
- private static String generateArguments(TypeToken<?>[] paramTypes, List<ColumnIdentifier> argNames)
+ /**
+ * Generate Java source code snippet for the arguments part to call the UDF implementation function -
+ * i.e. the {@code private #return_type# #execute_internal_name#(#argument_list#)} function
+ * (see {@code JavaSourceUDF.txt} template file for details).
+ * <p>
+ * This method generates the arguments code snippet for both {@code executeImpl} and
+ * {@code executeAggregateImpl}. General signature for both is the {@code protocolVersion} and
+ * then all UDF arguments. For aggregation UDF calls the first argument is always unserialized as
+ * that is the state variable.
+ * </p>
+ * <p>
+ * An example output for {@code executeImpl}:
+ * {@code (double) super.compose_double(protocolVersion, 0, params.get(0)), (double) super.compose_double(protocolVersion, 1, params.get(1))}
+ * </p>
+ * <p>
+ * Similar output for {@code executeAggregateImpl}:
+ * {@code firstParam, (double) super.compose_double(protocolVersion, 1, params.get(1))}
+ * </p>
+ */
+ private static String generateArguments(TypeToken<?>[] paramTypes, List<ColumnIdentifier> argNames, boolean forAggregate)
{
StringBuilder code = new StringBuilder(64 * paramTypes.length);
for (int i = 0; i < paramTypes.length; i++)
{
if (i > 0)
+ // add separator, if not the first argument
code.append(",\n");
+ // add comment only if trace is enabled
if (logger.isTraceEnabled())
code.append(" /* parameter '").append(argNames.get(i)).append("' */\n");
- code
- // cast to Java type
- .append(" (").append(javaSourceName(paramTypes[i])).append(") ")
+ // cast to Java type
+ code.append(" (").append(javaSourceName(paramTypes[i])).append(") ");
+
+ if (forAggregate && i == 0)
+ // special case for aggregations where the state variable (1st arg to state + final function and
+ // return value from state function) is not re-serialized
+ code.append("firstParam");
+ else
// generate object representation of input parameter (call UDFunction.compose)
- .append(composeMethod(paramTypes[i])).append("(protocolVersion, ").append(i).append(", params.get(").append(i).append("))");
+ code.append(composeMethod(paramTypes[i])).append("(protocolVersion, ").append(i).append(", params.get(").append(forAggregate ? i - 1 : i).append("))");
}
return code.toString();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/src/java/org/apache/cassandra/cql3/functions/JavaUDF.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/JavaUDF.java b/src/java/org/apache/cassandra/cql3/functions/JavaUDF.java
index 7410f1f..56a7ced 100644
--- a/src/java/org/apache/cassandra/cql3/functions/JavaUDF.java
+++ b/src/java/org/apache/cassandra/cql3/functions/JavaUDF.java
@@ -45,6 +45,8 @@ public abstract class JavaUDF
protected abstract ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params);
+ protected abstract Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params);
+
protected Object compose(int protocolVersion, int argIndex, ByteBuffer value)
{
return UDFunction.compose(argCodecs, protocolVersion, argIndex, value);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java b/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java
index b524163..8c15dc9 100644
--- a/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java
@@ -177,6 +177,29 @@ final class ScriptBasedUDFunction extends UDFunction
for (int i = 0; i < params.length; i++)
params[i] = compose(protocolVersion, i, parameters.get(i));
+ Object result = executeScriptInternal(params);
+
+ return decompose(protocolVersion, result);
+ }
+
+ /**
+ * Like {@link #executeUserDefined(int, List)} but the first parameter is already in non-serialized form.
+ * Remaining parameters (2nd paramters and all others) are in {@code parameters}.
+ * This is used to prevent superfluous (de)serialization of the state of aggregates.
+ * Means: scalar functions of aggregates are called using this variant.
+ */
+ protected Object executeAggregateUserDefined(int protocolVersion, Object firstParam, List<ByteBuffer> parameters)
+ {
+ Object[] params = new Object[argTypes.size()];
+ params[0] = firstParam;
+ for (int i = 1; i < params.length; i++)
+ params[i] = compose(protocolVersion, i, parameters.get(i - 1));
+
+ return executeScriptInternal(params);
+ }
+
+ private Object executeScriptInternal(Object[] params)
+ {
ScriptContext scriptContext = new SimpleScriptContext();
scriptContext.setAttribute("javax.script.filename", this.name.toString(), ScriptContext.ENGINE_SCOPE);
Bindings bindings = scriptContext.getBindings(ScriptContext.ENGINE_SCOPE);
@@ -251,7 +274,7 @@ final class ScriptBasedUDFunction extends UDFunction
}
}
- return decompose(protocolVersion, result);
+ return result;
}
private final class UDFContextWrapper extends AbstractJSObject
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java b/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
index 52b8163..6570ba8 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
@@ -24,6 +24,7 @@ import com.google.common.base.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datastax.driver.core.TypeCodec;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.schema.Functions;
@@ -36,7 +37,9 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction
{
protected static final Logger logger = LoggerFactory.getLogger(UDAggregate.class);
- protected final AbstractType<?> stateType;
+ private final AbstractType<?> stateType;
+ private final TypeCodec stateTypeCodec;
+ private final TypeCodec returnTypeCodec;
protected final ByteBuffer initcond;
private final ScalarFunction stateFunction;
private final ScalarFunction finalFunction;
@@ -52,6 +55,8 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction
this.stateFunction = stateFunc;
this.finalFunction = finalFunc;
this.stateType = stateFunc != null ? stateFunc.returnType() : null;
+ this.stateTypeCodec = stateType != null ? UDHelper.codecFor(UDHelper.driverType(stateType)) : null;
+ this.returnTypeCodec = returnType != null ? UDHelper.codecFor(UDHelper.driverType(returnType)) : null;
this.initcond = initcond;
}
@@ -68,7 +73,7 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction
List<AbstractType<?>> stateTypes = new ArrayList<>(argTypes.size() + 1);
stateTypes.add(stateType);
stateTypes.addAll(argTypes);
- List<AbstractType<?>> finalTypes = Collections.<AbstractType<?>>singletonList(stateType);
+ List<AbstractType<?>> finalTypes = Collections.singletonList(stateType);
return new UDAggregate(name,
argTypes,
returnType,
@@ -81,7 +86,7 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction
List<AbstractType<?>> argTypes,
AbstractType<?> returnType,
ByteBuffer initcond,
- final InvalidRequestException reason)
+ InvalidRequestException reason)
{
return new UDAggregate(name, argTypes, returnType, null, null, initcond)
{
@@ -150,48 +155,55 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction
private long stateFunctionCount;
private long stateFunctionDuration;
- private ByteBuffer state;
- {
- reset();
- }
+ private Object state;
+ private boolean needsInit = true;
public void addInput(int protocolVersion, List<ByteBuffer> values) throws InvalidRequestException
{
+ if (needsInit)
+ {
+ state = initcond != null ? UDHelper.deserialize(stateTypeCodec, protocolVersion, initcond.duplicate()) : null;
+ stateFunctionDuration = 0;
+ stateFunctionCount = 0;
+ needsInit = false;
+ }
+
long startTime = System.nanoTime();
stateFunctionCount++;
- List<ByteBuffer> fArgs = new ArrayList<>(values.size() + 1);
- fArgs.add(state);
- fArgs.addAll(values);
if (stateFunction instanceof UDFunction)
{
UDFunction udf = (UDFunction)stateFunction;
- if (udf.isCallableWrtNullable(fArgs))
- state = udf.execute(protocolVersion, fArgs);
+ if (udf.isCallableWrtNullable(values))
+ state = udf.executeForAggregate(protocolVersion, state, values);
}
else
{
- state = stateFunction.execute(protocolVersion, fArgs);
+ throw new UnsupportedOperationException("UDAs only support UDFs");
}
stateFunctionDuration += (System.nanoTime() - startTime) / 1000;
}
public ByteBuffer compute(int protocolVersion) throws InvalidRequestException
{
+ assert !needsInit;
+
// final function is traced in UDFunction
Tracing.trace("Executed UDA {}: {} call(s) to state function {} in {}\u03bcs", name(), stateFunctionCount, stateFunction.name(), stateFunctionDuration);
if (finalFunction == null)
- return state;
+ return UDFunction.decompose(stateTypeCodec, protocolVersion, state);
- List<ByteBuffer> fArgs = Collections.singletonList(state);
- ByteBuffer result = finalFunction.execute(protocolVersion, fArgs);
- return result;
+ if (finalFunction instanceof UDFunction)
+ {
+ UDFunction udf = (UDFunction)finalFunction;
+ Object result = udf.executeForAggregate(protocolVersion, state, Collections.emptyList());
+ return UDFunction.decompose(returnTypeCodec, protocolVersion, result);
+ }
+ throw new UnsupportedOperationException("UDAs only support UDFs");
}
public void reset()
{
- state = initcond != null ? initcond.duplicate() : null;
- stateFunctionDuration = 0;
- stateFunctionCount = 0;
+ needsInit = true;
}
};
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/src/java/org/apache/cassandra/cql3/functions/UDFByteCodeVerifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDFByteCodeVerifier.java b/src/java/org/apache/cassandra/cql3/functions/UDFByteCodeVerifier.java
index cfaa70f..7d28fcd 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDFByteCodeVerifier.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDFByteCodeVerifier.java
@@ -108,6 +108,13 @@ public final class UDFByteCodeVerifier
// the executeImpl method - ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
return new ExecuteImplVisitor(errors);
}
+ if ("executeAggregateImpl".equals(name) && "(ILjava/lang/Object;Ljava/util/List;)Ljava/lang/Object;".equals(desc))
+ {
+ if (Opcodes.ACC_PROTECTED != access)
+ errors.add("executeAggregateImpl not protected");
+ // the executeImpl method - ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
+ return new ExecuteImplVisitor(errors);
+ }
if ("<clinit>".equals(name))
{
errors.add("static initializer declared");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
index 6e8d187..70d459f 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
@@ -28,6 +28,7 @@ import java.util.Enumeration;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -258,12 +259,22 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
return Executors.newSingleThreadExecutor();
}
+ protected Object executeAggregateUserDefined(int protocolVersion, Object firstParam, List<ByteBuffer> parameters)
+ {
+ throw broken();
+ }
+
public ByteBuffer executeUserDefined(int protocolVersion, List<ByteBuffer> parameters)
{
- throw new InvalidRequestException(String.format("Function '%s' exists but hasn't been loaded successfully "
- + "for the following reason: %s. Please see the server log for details",
- this,
- reason.getMessage()));
+ throw broken();
+ }
+
+ private InvalidRequestException broken()
+ {
+ return new InvalidRequestException(String.format("Function '%s' exists but hasn't been loaded successfully "
+ + "for the following reason: %s. Please see the server log for details",
+ this,
+ reason.getMessage()));
}
};
}
@@ -301,6 +312,44 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
}
}
+ /**
+ * Like {@link #execute(int, List)} but the first parameter is already in non-serialized form.
+ * Remaining parameters (2nd paramters and all others) are in {@code parameters}.
+ * This is used to prevent superfluous (de)serialization of the state of aggregates.
+ * Means: scalar functions of aggregates are called using this variant.
+ */
+ public final Object executeForAggregate(int protocolVersion, Object firstParam, List<ByteBuffer> parameters)
+ {
+ assertUdfsEnabled(language);
+
+ if (!calledOnNullInput && firstParam == null || !isCallableWrtNullable(parameters))
+ return null;
+
+ long tStart = System.nanoTime();
+ parameters = makeEmptyParametersNull(parameters);
+
+ try
+ {
+ // Using async UDF execution is expensive (adds about 100us overhead per invocation on a Core-i7 MBPr).
+ Object result = DatabaseDescriptor.enableUserDefinedFunctionsThreads()
+ ? executeAggregateAsync(protocolVersion, firstParam, parameters)
+ : executeAggregateUserDefined(protocolVersion, firstParam, parameters);
+ Tracing.trace("Executed UDF {} in {}\u03bcs", name(), (System.nanoTime() - tStart) / 1000);
+ return result;
+ }
+ catch (InvalidRequestException e)
+ {
+ throw e;
+ }
+ catch (Throwable t)
+ {
+ logger.debug("Invocation of user-defined function '{}' failed", this, t);
+ if (t instanceof VirtualMachineError)
+ throw (VirtualMachineError) t;
+ throw FunctionExecutionException.create(this, t);
+ }
+ }
+
public static void assertUdfsEnabled(String language)
{
if (!DatabaseDescriptor.enableUserDefinedFunctions())
@@ -344,10 +393,31 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
{
ThreadIdAndCpuTime threadIdAndCpuTime = new ThreadIdAndCpuTime();
- Future<ByteBuffer> future = executor().submit(() -> {
+ return async(threadIdAndCpuTime, () -> {
threadIdAndCpuTime.setup();
return executeUserDefined(protocolVersion, parameters);
});
+ }
+
+ /**
+ * Like {@link #executeAsync(int, List)} but the first parameter is already in non-serialized form.
+ * Remaining parameters (2nd paramters and all others) are in {@code parameters}.
+ * This is used to prevent superfluous (de)serialization of the state of aggregates.
+ * Means: scalar functions of aggregates are called using this variant.
+ */
+ private Object executeAggregateAsync(int protocolVersion, Object firstParam, List<ByteBuffer> parameters)
+ {
+ ThreadIdAndCpuTime threadIdAndCpuTime = new ThreadIdAndCpuTime();
+
+ return async(threadIdAndCpuTime, () -> {
+ threadIdAndCpuTime.setup();
+ return executeAggregateUserDefined(protocolVersion, firstParam, parameters);
+ });
+ }
+
+ private <T> T async(ThreadIdAndCpuTime threadIdAndCpuTime, Callable<T> callable)
+ {
+ Future<T> future = executor().submit(callable);
try
{
@@ -445,6 +515,8 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
protected abstract ByteBuffer executeUserDefined(int protocolVersion, List<ByteBuffer> parameters);
+ protected abstract Object executeAggregateUserDefined(int protocolVersion, Object firstParam, List<ByteBuffer> parameters);
+
public boolean isAggregate()
{
return false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/src/resources/org/apache/cassandra/cql3/functions/JavaSourceUDF.txt
----------------------------------------------------------------------
diff --git a/src/resources/org/apache/cassandra/cql3/functions/JavaSourceUDF.txt b/src/resources/org/apache/cassandra/cql3/functions/JavaSourceUDF.txt
index d736a5a..802081f 100644
--- a/src/resources/org/apache/cassandra/cql3/functions/JavaSourceUDF.txt
+++ b/src/resources/org/apache/cassandra/cql3/functions/JavaSourceUDF.txt
@@ -25,6 +25,14 @@ public final class #class_name# extends JavaUDF
return super.decompose(protocolVersion, result);
}
+ protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+ {
+ #return_type# result = #execute_internal_name#(
+#arguments_aggregate#
+ );
+ return result;
+ }
+
private #return_type# #execute_internal_name#(#argument_list#)
{
#body#
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallClone.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallClone.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallClone.java
index e8bae70..9efa83a 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallClone.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallClone.java
@@ -35,6 +35,11 @@ public final class CallClone extends JavaUDF
super(returnDataType, argDataTypes, udfContext);
}
+ protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+ {
+ throw new UnsupportedOperationException();
+ }
+
protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallComDatastax.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallComDatastax.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallComDatastax.java
index 1af5b01..4555ff5 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallComDatastax.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallComDatastax.java
@@ -36,6 +36,11 @@ public final class CallComDatastax extends JavaUDF
super(returnDataType, argDataTypes, udfContext);
}
+ protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+ {
+ throw new UnsupportedOperationException();
+ }
+
protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
{
DataType.cint();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallFinalize.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallFinalize.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallFinalize.java
index 5208849..b1ec15f 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallFinalize.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallFinalize.java
@@ -35,6 +35,11 @@ public final class CallFinalize extends JavaUDF
super(returnDataType, argDataTypes, udfContext);
}
+ protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+ {
+ throw new UnsupportedOperationException();
+ }
+
protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallOrgApache.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallOrgApache.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallOrgApache.java
index 758d0d0..728e482 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallOrgApache.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/CallOrgApache.java
@@ -36,6 +36,11 @@ public final class CallOrgApache extends JavaUDF
super(returnDataType, argDataTypes, udfContext);
}
+ protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+ {
+ throw new UnsupportedOperationException();
+ }
+
protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
{
DatabaseDescriptor.getClusterName();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithField.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithField.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithField.java
index 256c2bd..4c38b44 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithField.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithField.java
@@ -35,6 +35,11 @@ public final class ClassWithField extends JavaUDF
super(returnDataType, argDataTypes, udfContext);
}
+ protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+ {
+ throw new UnsupportedOperationException();
+ }
+
protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
{
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer.java
index 3366314..cc2738a 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer.java
@@ -35,6 +35,11 @@ public final class ClassWithInitializer extends JavaUDF
super(returnDataType, argDataTypes, udfContext);
}
+ protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+ {
+ throw new UnsupportedOperationException();
+ }
+
protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
{
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer2.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer2.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer2.java
index aaf3e7b..780c0e4 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer2.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer2.java
@@ -35,6 +35,11 @@ public final class ClassWithInitializer2 extends JavaUDF
super(returnDataType, argDataTypes, udfContext);
}
+ protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+ {
+ throw new UnsupportedOperationException();
+ }
+
protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
{
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer3.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer3.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer3.java
index 4895aa0..e163ec9 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer3.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInitializer3.java
@@ -35,6 +35,11 @@ public final class ClassWithInitializer3 extends JavaUDF
super(returnDataType, argDataTypes, udfContext);
}
+ protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+ {
+ throw new UnsupportedOperationException();
+ }
+
protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
{
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInnerClass.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInnerClass.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInnerClass.java
index 2166771..3c4dc9b 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInnerClass.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInnerClass.java
@@ -35,6 +35,11 @@ public final class ClassWithInnerClass extends JavaUDF
super(returnDataType, argDataTypes, udfContext);
}
+ protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+ {
+ throw new UnsupportedOperationException();
+ }
+
protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
{
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInnerClass2.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInnerClass2.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInnerClass2.java
index 9c18510..b316040 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInnerClass2.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithInnerClass2.java
@@ -35,6 +35,11 @@ public final class ClassWithInnerClass2 extends JavaUDF
super(returnDataType, argDataTypes, udfContext);
}
+ protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+ {
+ throw new UnsupportedOperationException();
+ }
+
protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
{
// this is fine
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithStaticInitializer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithStaticInitializer.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithStaticInitializer.java
index 3c958e8..c97a94a 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithStaticInitializer.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithStaticInitializer.java
@@ -35,6 +35,11 @@ public final class ClassWithStaticInitializer extends JavaUDF
super(returnDataType, argDataTypes, udfContext);
}
+ protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+ {
+ throw new UnsupportedOperationException();
+ }
+
protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
{
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithStaticInnerClass.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithStaticInnerClass.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithStaticInnerClass.java
index fada145..1b019cc 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithStaticInnerClass.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/ClassWithStaticInnerClass.java
@@ -35,6 +35,11 @@ public final class ClassWithStaticInnerClass extends JavaUDF
super(returnDataType, argDataTypes, udfContext);
}
+ protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+ {
+ throw new UnsupportedOperationException();
+ }
+
protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
{
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/GoodClass.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/GoodClass.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/GoodClass.java
index eb25f72..54821b9 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/GoodClass.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/GoodClass.java
@@ -35,6 +35,11 @@ public final class GoodClass extends JavaUDF
super(returnDataType, argDataTypes, udfContext);
}
+ protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+ {
+ throw new UnsupportedOperationException();
+ }
+
protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
{
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronized.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronized.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronized.java
index bbbc823..dba846d 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronized.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronized.java
@@ -35,6 +35,11 @@ public final class UseOfSynchronized extends JavaUDF
super(returnDataType, argDataTypes, udfContext);
}
+ protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+ {
+ throw new UnsupportedOperationException();
+ }
+
protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
{
synchronized (this)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithNotify.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithNotify.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithNotify.java
index 07c70c7..63c319c 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithNotify.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithNotify.java
@@ -35,6 +35,11 @@ public final class UseOfSynchronizedWithNotify extends JavaUDF
super(returnDataType, argDataTypes, udfContext);
}
+ protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+ {
+ throw new UnsupportedOperationException();
+ }
+
protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
{
synchronized (this)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithNotifyAll.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithNotifyAll.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithNotifyAll.java
index 529c995..4d0c2a0 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithNotifyAll.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithNotifyAll.java
@@ -35,6 +35,11 @@ public final class UseOfSynchronizedWithNotifyAll extends JavaUDF
super(returnDataType, argDataTypes, udfContext);
}
+ protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+ {
+ throw new UnsupportedOperationException();
+ }
+
protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
{
synchronized (this)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWait.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWait.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWait.java
index 6e39813..b002086 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWait.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWait.java
@@ -35,6 +35,11 @@ public final class UseOfSynchronizedWithWait extends JavaUDF
super(returnDataType, argDataTypes, udfContext);
}
+ protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+ {
+ throw new UnsupportedOperationException();
+ }
+
protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
{
synchronized (this)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWaitL.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWaitL.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWaitL.java
index ac29211..f128fac 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWaitL.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWaitL.java
@@ -35,6 +35,11 @@ public final class UseOfSynchronizedWithWaitL extends JavaUDF
super(returnDataType, argDataTypes, udfContext);
}
+ protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+ {
+ throw new UnsupportedOperationException();
+ }
+
protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
{
synchronized (this)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWaitLI.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWaitLI.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWaitLI.java
index 3b9ce8b..d439518 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWaitLI.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UseOfSynchronizedWithWaitLI.java
@@ -35,6 +35,11 @@ public final class UseOfSynchronizedWithWaitLI extends JavaUDF
super(returnDataType, argDataTypes, udfContext);
}
+ protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+ {
+ throw new UnsupportedOperationException();
+ }
+
protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
{
synchronized (this)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UsingMapEntry.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UsingMapEntry.java b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UsingMapEntry.java
index 5091dc1..b99dbfd 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UsingMapEntry.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/udfverify/UsingMapEntry.java
@@ -37,6 +37,11 @@ public final class UsingMapEntry extends JavaUDF
super(returnDataType, argDataTypes, udfContext);
}
+ protected Object executeAggregateImpl(int protocolVersion, Object firstParam, List<ByteBuffer> params)
+ {
+ throw new UnsupportedOperationException();
+ }
+
protected ByteBuffer executeImpl(int protocolVersion, List<ByteBuffer> params)
{
Map<String, String> map = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adffb360/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
index 24a9528..506d533 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.cql3.validation.operations;
import java.math.BigDecimal;
-import java.math.MathContext;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
@@ -40,6 +39,8 @@ import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.spi.TurboFilterList;
import ch.qos.logback.classic.turbo.ReconfigureOnChangeFilter;
import ch.qos.logback.classic.turbo.TurboFilter;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TupleValue;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.QueryProcessor;
@@ -48,7 +49,6 @@ import org.apache.cassandra.cql3.UntypedResultSet.Row;
import org.apache.cassandra.cql3.functions.UDAggregate;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.DynamicCompositeType;
import org.apache.cassandra.db.marshal.TypeParser;
import org.apache.cassandra.exceptions.FunctionExecutionException;
import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -1883,6 +1883,7 @@ public class AggregationTest extends CQLTester
row(finalFunc, initCond));
}
+ @Test
public void testCustomTypeInitcond() throws Throwable
{
try
@@ -1966,4 +1967,58 @@ public class AggregationTest extends CQLTester
assertRows(execute("select avg(val) from %s where bucket in (1, 2, 3);"),
row(a));
}
+
+ @Test
+ public void testSameStateInstance() throws Throwable
+ {
+ // CASSANDRA-9613 removes the neccessity to re-serialize the state variable for each
+ // UDA state function and final function call.
+ //
+ // To test that the same state object instance is used during each invocation of the
+ // state and final function, this test uses a trick:
+ // it puts the identity hash code of the state variable to a tuple. The test then
+ // just asserts that the identity hash code is the same for all invocations
+ // of the state function and the final function.
+
+ String sf = createFunction(KEYSPACE,
+ "tuple<int,int,int,int>, int",
+ "CREATE FUNCTION %s(s tuple<int,int,int,int>, i int) " +
+ "CALLED ON NULL INPUT " +
+ "RETURNS tuple<int,int,int,int> " +
+ "LANGUAGE java " +
+ "AS 's.setInt(i, System.identityHashCode(s)); return s;'");
+
+ String ff = createFunction(KEYSPACE,
+ "tuple<int,int,int,int>",
+ "CREATE FUNCTION %s(s tuple<int,int,int,int>) " +
+ "CALLED ON NULL INPUT " +
+ "RETURNS tuple<int,int,int,int> " +
+ "LANGUAGE java " +
+ "AS 's.setInt(3, System.identityHashCode(s)); return s;'");
+
+ String a = createAggregate(KEYSPACE,
+ "int",
+ "CREATE AGGREGATE %s(int) " +
+ "SFUNC " + shortFunctionName(sf) + ' ' +
+ "STYPE tuple<int,int,int,int> " +
+ "FINALFUNC " + shortFunctionName(ff) + ' ' +
+ "INITCOND (0,1,2)");
+
+ createTable("CREATE TABLE %s (a int primary key, b int)");
+ execute("INSERT INTO %s (a, b) VALUES (0, 0)");
+ execute("INSERT INTO %s (a, b) VALUES (1, 1)");
+ execute("INSERT INTO %s (a, b) VALUES (2, 2)");
+ try (Session s = sessionNet())
+ {
+ com.datastax.driver.core.Row row = s.execute("SELECT " + a + "(b) FROM " + KEYSPACE + '.' + currentTable()).one();
+ TupleValue tuple = row.getTupleValue(0);
+ int h0 = tuple.getInt(0);
+ int h1 = tuple.getInt(1);
+ int h2 = tuple.getInt(2);
+ int h3 = tuple.getInt(3);
+ assertEquals(h0, h1);
+ assertEquals(h0, h2);
+ assertEquals(h0, h3);
+ }
+ }
}