You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/08/07 17:45:46 UTC
[13/15] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65c695c9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65c695c9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65c695c9
Branch: refs/heads/cassandra-3.0
Commit: 65c695c9186afb80a2c4d6b904b66524ebbe0754
Parents: 6818ba9 39c7869
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Fri Aug 7 16:12:55 2015 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Fri Aug 7 16:12:55 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../JMXEnabledScheduledThreadPoolExecutor.java | 137 -------------------
...EnabledScheduledThreadPoolExecutorMBean.java | 26 ----
.../JMXEnabledThreadPoolExecutor.java | 5 +
.../cql3/functions/JavaBasedUDFunction.java | 21 ++-
.../cql3/functions/ScriptBasedUDFunction.java | 17 +--
.../cassandra/db/HintedHandOffManager.java | 26 +++-
7 files changed, 41 insertions(+), 192 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65c695c9/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index caf6fb4,98c8d73..13614cc
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -68,8 -30,9 +68,9 @@@ Merged from 2.1
* Handle corrupt files on startup (CASSANDRA-9686)
* Fix clientutil jar and tests (CASSANDRA-9760)
* (cqlsh) Allow the SSL protocol version to be specified through the
- config file or environment variables (CASSANDRA-9544)
+ config file or environment variables (CASSANDRA-9544)
Merged from 2.0:
+ * Remove erroneous pending HH tasks from tpstats/jmx (CASSANDRA-9129)
* Don't cast expected bf size to an int (CASSANDRA-9959)
* checkForEndpointCollision fails for legitimate collisions (CASSANDRA-9765)
* Complete CASSANDRA-8448 fix (CASSANDRA-9519)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65c695c9/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
index 2b86701,2b86701..a7a54f2
--- a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
@@@ -53,6 -53,6 +53,11 @@@ public class JMXEnabledThreadPoolExecut
this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName, priority), "internal");
}
++ public JMXEnabledThreadPoolExecutor(NamedThreadFactory threadFactory, String jmxPath)
++ {
++ this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory, jmxPath);
++ }
++
public JMXEnabledThreadPoolExecutor(int corePoolSize,
long keepAliveTime,
TimeUnit unit,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65c695c9/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
index e066581,0000000..3d8ebd9
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
@@@ -1,645 -1,0 +1,640 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.functions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+import java.lang.reflect.InvocationTargetException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.URLStreamHandler;
+import java.nio.ByteBuffer;
+import java.security.CodeSource;
+import java.security.PermissionCollection;
+import java.security.ProtectionDomain;
+import java.security.SecureClassLoader;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.ThreadLocalRandom;
++import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.io.ByteStreams;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.DataType;
- import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor;
++import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
- import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.utils.FBUtilities;
+import org.eclipse.jdt.core.compiler.IProblem;
+import org.eclipse.jdt.internal.compiler.*;
+import org.eclipse.jdt.internal.compiler.Compiler;
+import org.eclipse.jdt.internal.compiler.classfmt.ClassFileReader;
+import org.eclipse.jdt.internal.compiler.classfmt.ClassFormatException;
+import org.eclipse.jdt.internal.compiler.env.ICompilationUnit;
+import org.eclipse.jdt.internal.compiler.env.INameEnvironment;
+import org.eclipse.jdt.internal.compiler.env.NameEnvironmentAnswer;
+import org.eclipse.jdt.internal.compiler.impl.CompilerOptions;
+import org.eclipse.jdt.internal.compiler.problem.DefaultProblemFactory;
+
+final class JavaBasedUDFunction extends UDFunction
+{
+ private static final String BASE_PACKAGE = "org.apache.cassandra.cql3.udf.gen";
+
+ static final Logger logger = LoggerFactory.getLogger(JavaBasedUDFunction.class);
+
+ private static final AtomicInteger classSequence = new AtomicInteger();
+
- private static final JMXEnabledScheduledThreadPoolExecutor executor =
- new JMXEnabledScheduledThreadPoolExecutor(
- DatabaseDescriptor.getMaxHintsThread(),
- new NamedThreadFactory("UserDefinedFunctions",
- Thread.MIN_PRIORITY,
- udfClassLoader,
- new SecurityThreadGroup("UserDefinedFunctions", null)),
- "userfunction");
++ private static final JMXEnabledThreadPoolExecutor executor =
++ new JMXEnabledThreadPoolExecutor(new NamedThreadFactory("UserDefinedFunctions",
++ Thread.MIN_PRIORITY,
++ udfClassLoader,
++ new SecurityThreadGroup("UserDefinedFunctions", null)),
++ "userfunction");
+
+ private static final EcjTargetClassLoader targetClassLoader = new EcjTargetClassLoader();
+
+ private static final UDFByteCodeVerifier udfByteCodeVerifier = new UDFByteCodeVerifier();
+
+ private static final ProtectionDomain protectionDomain;
+
+ private static final IErrorHandlingPolicy errorHandlingPolicy = DefaultErrorHandlingPolicies.proceedWithAllProblems();
+ private static final IProblemFactory problemFactory = new DefaultProblemFactory(Locale.ENGLISH);
+ private static final CompilerOptions compilerOptions;
+
+ /**
+ * Poor man's template - just a text file splitted at '#' chars.
+ * Each string at an even index is a constant string (just copied),
+ * each string at an odd index is an 'instruction'.
+ */
+ private static final String[] javaSourceTemplate;
+
+ static
+ {
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/Class", "forName");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/Class", "getClassLoader");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/Class", "getResource");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/Class", "getResourceAsStream");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "clearAssertionStatus");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getResource");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getResourceAsStream");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getResources");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getSystemClassLoader");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getSystemResource");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getSystemResourceAsStream");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getSystemResources");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "loadClass");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "setClassAssertionStatus");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "setDefaultAssertionStatus");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "setPackageAssertionStatus");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/nio/ByteBuffer", "allocateDirect");
+
+ Map<String, String> settings = new HashMap<>();
+ settings.put(CompilerOptions.OPTION_LineNumberAttribute,
+ CompilerOptions.GENERATE);
+ settings.put(CompilerOptions.OPTION_SourceFileAttribute,
+ CompilerOptions.DISABLED);
+ settings.put(CompilerOptions.OPTION_ReportDeprecation,
+ CompilerOptions.IGNORE);
+ settings.put(CompilerOptions.OPTION_Source,
+ CompilerOptions.VERSION_1_8);
+ settings.put(CompilerOptions.OPTION_TargetPlatform,
+ CompilerOptions.VERSION_1_8);
+
+ compilerOptions = new CompilerOptions(settings);
+ compilerOptions.parseLiteralExpressionsAsConstants = true;
+
+ try (InputStream input = JavaBasedUDFunction.class.getResource("JavaSourceUDF.txt").openConnection().getInputStream())
+ {
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ FBUtilities.copy(input, output, Long.MAX_VALUE);
+ String template = output.toString();
+
+ StringTokenizer st = new StringTokenizer(template, "#");
+ javaSourceTemplate = new String[st.countTokens()];
+ for (int i = 0; st.hasMoreElements(); i++)
+ javaSourceTemplate[i] = st.nextToken();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ CodeSource codeSource;
+ try
+ {
+ codeSource = new CodeSource(new URL("udf", "localhost", 0, "/java", new URLStreamHandler()
+ {
+ protected URLConnection openConnection(URL u)
+ {
+ return null;
+ }
+ }), (Certificate[])null);
+ }
+ catch (MalformedURLException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ protectionDomain = new ProtectionDomain(codeSource, ThreadAwareSecurityManager.noPermissions, targetClassLoader, null);
+ }
+
+ private final JavaUDF javaUDF;
+
+ JavaBasedUDFunction(FunctionName name, List<ColumnIdentifier> argNames, List<AbstractType<?>> argTypes,
+ AbstractType<?> returnType, boolean calledOnNullInput, String body)
+ {
+ super(name, argNames, argTypes, UDHelper.driverTypes(argTypes),
+ returnType, UDHelper.driverType(returnType), calledOnNullInput, "java", body);
+
+ // javaParamTypes is just the Java representation for argTypes resp. argDataTypes
+ Class<?>[] javaParamTypes = UDHelper.javaTypes(argDataTypes, calledOnNullInput);
+ // javaReturnType is just the Java representation for returnType resp. returnDataType
+ Class<?> javaReturnType = returnDataType.asJavaClass();
+
+ // put each UDF in a separate package to prevent cross-UDF code access
+ String pkgName = BASE_PACKAGE + '.' + generateClassName(name, 'p');
+ String clsName = generateClassName(name, 'C');
+
+ String executeInternalName = generateClassName(name, 'x');
+
+ StringBuilder javaSourceBuilder = new StringBuilder();
+ int lineOffset = 1;
+ for (int i = 0; i < javaSourceTemplate.length; i++)
+ {
+ String s = javaSourceTemplate[i];
+
+ // strings at odd indexes are 'instructions'
+ if ((i & 1) == 1)
+ {
+ switch (s)
+ {
+ case "package_name":
+ s = pkgName;
+ break;
+ case "class_name":
+ s = clsName;
+ break;
+ case "body":
+ lineOffset = countNewlines(javaSourceBuilder);
+ s = body;
+ break;
+ case "arguments":
+ s = generateArguments(javaParamTypes, argNames);
+ break;
+ case "argument_list":
+ s = generateArgumentList(javaParamTypes, argNames);
+ break;
+ case "return_type":
+ s = javaSourceName(javaReturnType);
+ break;
+ case "execute_internal_name":
+ s = executeInternalName;
+ break;
+ }
+ }
+
+ javaSourceBuilder.append(s);
+ }
+
+ String targetClassName = pkgName + '.' + clsName;
+
+ String javaSource = javaSourceBuilder.toString();
+
+ logger.debug("Compiling Java source UDF '{}' as class '{}' using source:\n{}", name, targetClassName, javaSource);
+
+ try
+ {
+ EcjCompilationUnit compilationUnit = new EcjCompilationUnit(javaSource, targetClassName);
+
+ org.eclipse.jdt.internal.compiler.Compiler compiler = new Compiler(compilationUnit,
+ errorHandlingPolicy,
+ compilerOptions,
+ compilationUnit,
+ problemFactory);
+ compiler.compile(new ICompilationUnit[]{ compilationUnit });
+
+ if (compilationUnit.problemList != null && !compilationUnit.problemList.isEmpty())
+ {
+ boolean fullSource = false;
+ StringBuilder problems = new StringBuilder();
+ for (IProblem problem : compilationUnit.problemList)
+ {
+ long ln = problem.getSourceLineNumber() - lineOffset;
+ if (ln < 1L)
+ {
+ if (problem.isError())
+ {
+ // if generated source around UDF source provided by the user is buggy,
+ // this code is appended.
+ problems.append("GENERATED SOURCE ERROR: line ")
+ .append(problem.getSourceLineNumber())
+ .append(" (in generated source): ")
+ .append(problem.getMessage())
+ .append('\n');
+ fullSource = true;
+ }
+ }
+ else
+ {
+ problems.append("Line ")
+ .append(Long.toString(ln))
+ .append(": ")
+ .append(problem.getMessage())
+ .append('\n');
+ }
+ }
+
+ if (fullSource)
+ throw new InvalidRequestException("Java source compilation failed:\n" + problems + "\n generated source:\n" + javaSource);
+ else
+ throw new InvalidRequestException("Java source compilation failed:\n" + problems);
+ }
+
+ // Verify the UDF bytecode against use of probably dangerous code
+ Set<String> errors = udfByteCodeVerifier.verify(targetClassLoader.classData(targetClassName));
+ String validDeclare = "not allowed method declared: " + executeInternalName + '(';
+ String validCall = "call to " + targetClassName.replace('.', '/') + '.' + executeInternalName + "()";
+ for (Iterator<String> i = errors.iterator(); i.hasNext();)
+ {
+ String error = i.next();
+ // we generate a random name of the private, internal execute method, which is detected by the byte-code verifier
+ if (error.startsWith(validDeclare) || error.equals(validCall))
+ {
+ i.remove();
+ }
+ }
+ if (!errors.isEmpty())
+ throw new InvalidRequestException("Java UDF validation failed: " + errors);
+
+ // Load the class and create a new instance of it
+ Thread thread = Thread.currentThread();
+ ClassLoader orig = thread.getContextClassLoader();
+ try
+ {
+ thread.setContextClassLoader(UDFunction.udfClassLoader);
+ // Execute UDF intiialization from UDF class loader
+
+ Class cls = Class.forName(targetClassName, false, targetClassLoader);
+
+ if (cls.getDeclaredMethods().length != 2 || cls.getDeclaredConstructors().length != 1)
+ throw new InvalidRequestException("Check your source to not define additional Java methods or constructors");
+ MethodType methodType = MethodType.methodType(void.class)
+ .appendParameterTypes(DataType.class, DataType[].class);
+ MethodHandle ctor = MethodHandles.lookup().findConstructor(cls, methodType);
+ this.javaUDF = (JavaUDF) ctor.invokeWithArguments(returnDataType, argDataTypes);
+ }
+ finally
+ {
+ thread.setContextClassLoader(orig);
+ }
+ }
+ catch (InvocationTargetException e)
+ {
+ // in case of an ITE, use the cause
+ throw new InvalidRequestException(String.format("Could not compile function '%s' from Java source: %s", name, e.getCause()));
+ }
+ catch (VirtualMachineError e)
+ {
+ throw e;
+ }
+ catch (Throwable e)
+ {
+ throw new InvalidRequestException(String.format("Could not compile function '%s' from Java source: %s", name, e));
+ }
+ }
+
+ protected ExecutorService executor()
+ {
+ return executor;
+ }
+
+ protected ByteBuffer executeUserDefined(int protocolVersion, List<ByteBuffer> params)
+ {
+ return javaUDF.executeImpl(protocolVersion, params);
+ }
+
+
+ private static int countNewlines(StringBuilder javaSource)
+ {
+ int ln = 0;
+ for (int i = 0; i < javaSource.length(); i++)
+ if (javaSource.charAt(i) == '\n')
+ ln++;
+ return ln;
+ }
+
+ private static String generateClassName(FunctionName name, char prefix)
+ {
+ String qualifiedName = name.toString();
+
+ StringBuilder sb = new StringBuilder(qualifiedName.length() + 10);
+ sb.append(prefix);
+ for (int i = 0; i < qualifiedName.length(); i++)
+ {
+ char c = qualifiedName.charAt(i);
+ if (Character.isJavaIdentifierPart(c))
+ sb.append(c);
+ else
+ sb.append(Integer.toHexString(((short)c)&0xffff));
+ }
+ sb.append('_')
+ .append(ThreadLocalRandom.current().nextInt() & 0xffffff)
+ .append('_')
+ .append(classSequence.incrementAndGet());
+ return sb.toString();
+ }
+
+ private static String javaSourceName(Class<?> type)
+ {
+ String n = type.getName();
+ return n.startsWith("java.lang.") ? type.getSimpleName() : n;
+ }
+
+ private static String generateArgumentList(Class<?>[] paramTypes, List<ColumnIdentifier> argNames)
+ {
+ // initial builder size can just be a guess (prevent temp object allocations)
+ StringBuilder code = new StringBuilder(32 * paramTypes.length);
+ for (int i = 0; i < paramTypes.length; i++)
+ {
+ if (i > 0)
+ code.append(", ");
+ code.append(javaSourceName(paramTypes[i]))
+ .append(' ')
+ .append(argNames.get(i));
+ }
+ return code.toString();
+ }
+
+ private static String generateArguments(Class<?>[] paramTypes, List<ColumnIdentifier> argNames)
+ {
+ StringBuilder code = new StringBuilder(64 * paramTypes.length);
+ for (int i = 0; i < paramTypes.length; i++)
+ {
+ if (i > 0)
+ code.append(",\n");
+
+ if (logger.isDebugEnabled())
+ code.append(" /* parameter '").append(argNames.get(i)).append("' */\n");
+
+ code
+ // cast to Java type
+ .append(" (").append(javaSourceName(paramTypes[i])).append(") ")
+ // generate object representation of input parameter (call UDFunction.compose)
+ .append(composeMethod(paramTypes[i])).append("(protocolVersion, ").append(i).append(", params.get(").append(i).append("))");
+ }
+ return code.toString();
+ }
+
+ private static String composeMethod(Class<?> type)
+ {
+ return (type.isPrimitive()) ? ("super.compose_" + type.getName()) : "super.compose";
+ }
+
+ // Java source UDFs are a very simple compilation task, which allows us to let one class implement
+ // all interfaces required by ECJ.
+ static final class EcjCompilationUnit implements ICompilationUnit, ICompilerRequestor, INameEnvironment
+ {
+ List<IProblem> problemList;
+ private final String className;
+ private final char[] sourceCode;
+
+ EcjCompilationUnit(String sourceCode, String className)
+ {
+ this.className = className;
+ this.sourceCode = sourceCode.toCharArray();
+ }
+
+ // ICompilationUnit
+
+ @Override
+ public char[] getFileName()
+ {
+ return sourceCode;
+ }
+
+ @Override
+ public char[] getContents()
+ {
+ return sourceCode;
+ }
+
+ @Override
+ public char[] getMainTypeName()
+ {
+ int dot = className.lastIndexOf('.');
+ return ((dot > 0) ? className.substring(dot + 1) : className).toCharArray();
+ }
+
+ @Override
+ public char[][] getPackageName()
+ {
+ StringTokenizer izer = new StringTokenizer(className, ".");
+ char[][] result = new char[izer.countTokens() - 1][];
+ for (int i = 0; i < result.length; i++)
+ result[i] = izer.nextToken().toCharArray();
+ return result;
+ }
+
+ @Override
+ public boolean ignoreOptionalProblems()
+ {
+ return false;
+ }
+
+ // ICompilerRequestor
+
+ @Override
+ public void acceptResult(CompilationResult result)
+ {
+ if (result.hasErrors())
+ {
+ IProblem[] problems = result.getProblems();
+ if (problemList == null)
+ problemList = new ArrayList<>(problems.length);
+ Collections.addAll(problemList, problems);
+ }
+ else
+ {
+ ClassFile[] classFiles = result.getClassFiles();
+ for (ClassFile classFile : classFiles)
+ targetClassLoader.addClass(className, classFile.getBytes());
+ }
+ }
+
+ // INameEnvironment
+
+ @Override
+ public NameEnvironmentAnswer findType(char[][] compoundTypeName)
+ {
+ StringBuilder result = new StringBuilder();
+ for (int i = 0; i < compoundTypeName.length; i++)
+ {
+ if (i > 0)
+ result.append('.');
+ result.append(compoundTypeName[i]);
+ }
+ return findType(result.toString());
+ }
+
+ @Override
+ public NameEnvironmentAnswer findType(char[] typeName, char[][] packageName)
+ {
+ StringBuilder result = new StringBuilder();
+ int i = 0;
+ for (; i < packageName.length; i++)
+ {
+ if (i > 0)
+ result.append('.');
+ result.append(packageName[i]);
+ }
+ if (i > 0)
+ result.append('.');
+ result.append(typeName);
+ return findType(result.toString());
+ }
+
+ private NameEnvironmentAnswer findType(String className)
+ {
+ if (className.equals(this.className))
+ {
+ return new NameEnvironmentAnswer(this, null);
+ }
+
+ String resourceName = className.replace('.', '/') + ".class";
+
+ try (InputStream is = UDFunction.udfClassLoader.getResourceAsStream(resourceName))
+ {
+ if (is != null)
+ {
+ byte[] classBytes = ByteStreams.toByteArray(is);
+ char[] fileName = className.toCharArray();
+ ClassFileReader classFileReader = new ClassFileReader(classBytes, fileName, true);
+ return new NameEnvironmentAnswer(classFileReader, null);
+ }
+ }
+ catch (IOException | ClassFormatException exc)
+ {
+ throw new RuntimeException(exc);
+ }
+ return null;
+ }
+
+ private boolean isPackage(String result)
+ {
+ if (result.equals(this.className))
+ return false;
+ String resourceName = result.replace('.', '/') + ".class";
+ try (InputStream is = UDFunction.udfClassLoader.getResourceAsStream(resourceName))
+ {
+ return is == null;
+ }
+ catch (IOException e)
+ {
+ // we are here, since close on is failed. That means it was not null
+ return false;
+ }
+ }
+
+ @Override
+ public boolean isPackage(char[][] parentPackageName, char[] packageName)
+ {
+ StringBuilder result = new StringBuilder();
+ int i = 0;
+ if (parentPackageName != null)
+ for (; i < parentPackageName.length; i++)
+ {
+ if (i > 0)
+ result.append('.');
+ result.append(parentPackageName[i]);
+ }
+
+ if (Character.isUpperCase(packageName[0]) && !isPackage(result.toString()))
+ return false;
+ if (i > 0)
+ result.append('.');
+ result.append(packageName);
+
+ return isPackage(result.toString());
+ }
+
+ @Override
+ public void cleanup()
+ {
+ }
+ }
+
+ static final class EcjTargetClassLoader extends SecureClassLoader
+ {
+ EcjTargetClassLoader()
+ {
+ super(UDFunction.udfClassLoader);
+ }
+
+ // This map is usually empty.
+ // It only contains data *during* UDF compilation but not during runtime.
+ //
+ // addClass() is invoked by ECJ after successful compilation of the generated Java source.
+ // loadClass(targetClassName) is invoked by buildUDF() after ECJ returned from successful compilation.
+ //
+ private final Map<String, byte[]> classes = new ConcurrentHashMap<>();
+
+ void addClass(String className, byte[] classData)
+ {
+ classes.put(className, classData);
+ }
+
+ byte[] classData(String className)
+ {
+ return classes.get(className);
+ }
+
+ protected Class<?> findClass(String name) throws ClassNotFoundException
+ {
+ // remove the class binary - it's only used once - so it's wasting heap
+ byte[] classData = classes.remove(name);
+
+ if (classData != null)
+ return defineClass(name, classData, 0, classData.length, protectionDomain);
+
+ return getParent().loadClass(name);
+ }
+
+ protected PermissionCollection getPermissions(CodeSource codesource)
+ {
+ return ThreadAwareSecurityManager.noPermissions;
+ }
+ }}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65c695c9/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java
index d79960f,0000000..8b448fe
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java
@@@ -1,265 -1,0 +1,262 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.functions;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.URLStreamHandler;
+import java.nio.ByteBuffer;
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.security.CodeSource;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.security.ProtectionDomain;
+import java.security.cert.Certificate;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import javax.script.Bindings;
+import javax.script.Compilable;
+import javax.script.CompiledScript;
+import javax.script.ScriptContext;
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineFactory;
+import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
+import javax.script.SimpleScriptContext;
+
- import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor;
++import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
- import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+final class ScriptBasedUDFunction extends UDFunction
+{
+ static final Map<String, Compilable> scriptEngines = new HashMap<>();
+
+ private static final ProtectionDomain protectionDomain;
+ private static final AccessControlContext accessControlContext;
+
+ //
+ // For scripted UDFs we have to rely on the security mechanisms of the scripting engine and
+ // SecurityManager - especially SecurityManager.checkPackageAccess(). Unlike Java-UDFs, strict checking
+ // of class access via the UDF class loader is not possible, since e.g. Nashorn builds its own class loader
+ // (jdk.nashorn.internal.runtime.ScriptLoader / jdk.nashorn.internal.runtime.NashornLoader) configured with
+ // a system class loader.
+ //
+ private static final String[] allowedPackagesArray =
+ {
+ // following required by jdk.nashorn.internal.objects.Global.initJavaAccess()
+ "",
+ "com",
+ "edu",
+ "java",
+ "javax",
+ "javafx",
+ "org",
+ // following required by Nashorn runtime
+ "java.lang",
+ "java.lang.invoke",
+ "java.lang.reflect",
+ "java.nio.charset",
+ "java.util",
+ "java.util.concurrent",
+ "javax.script",
+ "sun.reflect",
+ "jdk.internal.org.objectweb.asm.commons",
+ "jdk.nashorn.internal.runtime",
+ "jdk.nashorn.internal.runtime.linker",
+ // following required by Java Driver
+ "java.math",
+ "java.text",
+ "com.google.common.base",
+ "com.google.common.reflect",
+ // following required by UDF
+ "com.datastax.driver.core",
+ "com.datastax.driver.core.utils"
+ };
+
- private static final JMXEnabledScheduledThreadPoolExecutor executor =
- new JMXEnabledScheduledThreadPoolExecutor(
- DatabaseDescriptor.getMaxHintsThread(),
- new NamedThreadFactory("UserDefinedScriptFunctions",
- Thread.MIN_PRIORITY,
- udfClassLoader,
- new SecurityThreadGroup("UserDefinedScriptFunctions", Collections.unmodifiableSet(new HashSet<>(Arrays.asList(allowedPackagesArray))))),
- "userscripts");
++ private static final JMXEnabledThreadPoolExecutor executor =
++ new JMXEnabledThreadPoolExecutor(new NamedThreadFactory("UserDefinedScriptFunctions",
++ Thread.MIN_PRIORITY,
++ udfClassLoader,
++ new SecurityThreadGroup("UserDefinedScriptFunctions", Collections.unmodifiableSet(new HashSet<>(Arrays.asList(allowedPackagesArray))))),
++ "userscripts");
+
+ static
+ {
+ ScriptEngineManager scriptEngineManager = new ScriptEngineManager();
+ for (ScriptEngineFactory scriptEngineFactory : scriptEngineManager.getEngineFactories())
+ {
+ ScriptEngine scriptEngine = scriptEngineFactory.getScriptEngine();
+ boolean compilable = scriptEngine instanceof Compilable;
+ if (compilable)
+ {
+ logger.info("Found scripting engine {} {} - {} {} - language names: {}",
+ scriptEngineFactory.getEngineName(), scriptEngineFactory.getEngineVersion(),
+ scriptEngineFactory.getLanguageName(), scriptEngineFactory.getLanguageVersion(),
+ scriptEngineFactory.getNames());
+ for (String name : scriptEngineFactory.getNames())
+ scriptEngines.put(name, (Compilable) scriptEngine);
+ }
+ }
+
+ try
+ {
+ protectionDomain = new ProtectionDomain(new CodeSource(new URL("udf", "localhost", 0, "/script", new URLStreamHandler()
+ {
+ protected URLConnection openConnection(URL u)
+ {
+ return null;
+ }
+ }), (Certificate[]) null), ThreadAwareSecurityManager.noPermissions);
+ }
+ catch (MalformedURLException e)
+ {
+ throw new RuntimeException(e);
+ }
+ accessControlContext = new AccessControlContext(new ProtectionDomain[]{ protectionDomain });
+ }
+
+ private final CompiledScript script;
+
+ ScriptBasedUDFunction(FunctionName name,
+ List<ColumnIdentifier> argNames,
+ List<AbstractType<?>> argTypes,
+ AbstractType<?> returnType,
+ boolean calledOnNullInput,
+ String language,
+ String body)
+ {
+ super(name, argNames, argTypes, returnType, calledOnNullInput, language, body);
+
+ Compilable scriptEngine = scriptEngines.get(language);
+ if (scriptEngine == null)
+ throw new InvalidRequestException(String.format("Invalid language '%s' for function '%s'", language, name));
+
+ // execute compilation with no-permissions to prevent evil code e.g. via "static code blocks" / "class initialization"
+ try
+ {
+ this.script = AccessController.doPrivileged((PrivilegedExceptionAction<CompiledScript>) () -> scriptEngine.compile(body),
+ accessControlContext);
+ }
+ catch (PrivilegedActionException x)
+ {
+ Throwable e = x.getCause();
+ logger.info("Failed to compile function '{}' for language {}: ", name, language, e);
+ throw new InvalidRequestException(
+ String.format("Failed to compile function '%s' for language %s: %s", name, language, e));
+ }
+ }
+
+ protected ExecutorService executor()
+ {
+ return executor;
+ }
+
+ public ByteBuffer executeUserDefined(int protocolVersion, List<ByteBuffer> parameters)
+ {
+ Object[] params = new Object[argTypes.size()];
+ for (int i = 0; i < params.length; i++)
+ params[i] = compose(protocolVersion, i, parameters.get(i));
+
+ ScriptContext scriptContext = new SimpleScriptContext();
+ scriptContext.setAttribute("javax.script.filename", this.name.toString(), ScriptContext.ENGINE_SCOPE);
+ Bindings bindings = scriptContext.getBindings(ScriptContext.ENGINE_SCOPE);
+ for (int i = 0; i < params.length; i++)
+ bindings.put(argNames.get(i).toString(), params[i]);
+
+ Object result;
+ try
+ {
+ // How to prevent Class.forName() _without_ "help" from the script engine ?
+ // NOTE: Nashorn enforces a special permission to allow class-loading, which is not granted - so it's fine.
+
+ result = script.eval(scriptContext);
+ }
+ catch (ScriptException e)
+ {
+ throw new RuntimeException(e);
+ }
+ if (result == null)
+ return null;
+
+ Class<?> javaReturnType = returnDataType.asJavaClass();
+ Class<?> resultType = result.getClass();
+ if (!javaReturnType.isAssignableFrom(resultType))
+ {
+ if (result instanceof Number)
+ {
+ Number rNumber = (Number) result;
+ if (javaReturnType == Integer.class)
+ result = rNumber.intValue();
+ else if (javaReturnType == Long.class)
+ result = rNumber.longValue();
+ else if (javaReturnType == Short.class)
+ result = rNumber.shortValue();
+ else if (javaReturnType == Byte.class)
+ result = rNumber.byteValue();
+ else if (javaReturnType == Float.class)
+ result = rNumber.floatValue();
+ else if (javaReturnType == Double.class)
+ result = rNumber.doubleValue();
+ else if (javaReturnType == BigInteger.class)
+ {
+ if (javaReturnType == Integer.class)
+ result = rNumber.intValue();
+ else if (javaReturnType == Short.class)
+ result = rNumber.shortValue();
+ else if (javaReturnType == Byte.class)
+ result = rNumber.byteValue();
+ else if (javaReturnType == Long.class)
+ result = rNumber.longValue();
+ else if (javaReturnType == Float.class)
+ result = rNumber.floatValue();
+ else if (javaReturnType == Double.class)
+ result = rNumber.doubleValue();
+ else if (javaReturnType == BigInteger.class)
+ {
+ if (rNumber instanceof BigDecimal)
+ result = ((BigDecimal) rNumber).toBigInteger();
+ else if (rNumber instanceof Double || rNumber instanceof Float)
+ result = new BigDecimal(rNumber.toString()).toBigInteger();
+ else
+ result = BigInteger.valueOf(rNumber.longValue());
+ }
+ else if (javaReturnType == BigDecimal.class)
+ // String c'tor of BigDecimal is more accurate than valueOf(double)
+ result = new BigDecimal(rNumber.toString());
+ }
+ else if (javaReturnType == BigDecimal.class)
+ // String c'tor of BigDecimal is more accurate than valueOf(double)
+ result = new BigDecimal(rNumber.toString());
+ }
+ }
+
+ return decompose(protocolVersion, result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65c695c9/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 17832d7,dae85b7..4656d41
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@@ -36,13 -37,15 +36,15 @@@ import com.google.common.util.concurren
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
- import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor;
+ import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+ import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.composites.Composites;
++
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UUIDType;
@@@ -57,9 -62,10 +59,11 @@@ import org.apache.cassandra.io.util.Dat
import org.apache.cassandra.metrics.HintedHandoffMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
- import org.apache.cassandra.service.*;
+ import org.apache.cassandra.service.StorageProxy;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.OpOrder;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
/**