You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/08/07 17:45:46 UTC

[13/15] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65c695c9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65c695c9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65c695c9

Branch: refs/heads/cassandra-3.0
Commit: 65c695c9186afb80a2c4d6b904b66524ebbe0754
Parents: 6818ba9 39c7869
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Fri Aug 7 16:12:55 2015 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Fri Aug 7 16:12:55 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../JMXEnabledScheduledThreadPoolExecutor.java  | 137 -------------------
 ...EnabledScheduledThreadPoolExecutorMBean.java |  26 ----
 .../JMXEnabledThreadPoolExecutor.java           |   5 +
 .../cql3/functions/JavaBasedUDFunction.java     |  21 ++-
 .../cql3/functions/ScriptBasedUDFunction.java   |  17 +--
 .../cassandra/db/HintedHandOffManager.java      |  26 +++-
 7 files changed, 41 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/65c695c9/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index caf6fb4,98c8d73..13614cc
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -68,8 -30,9 +68,9 @@@ Merged from 2.1
   * Handle corrupt files on startup (CASSANDRA-9686)
   * Fix clientutil jar and tests (CASSANDRA-9760)
   * (cqlsh) Allow the SSL protocol version to be specified through the
 -   config file or environment variables (CASSANDRA-9544)
 +    config file or environment variables (CASSANDRA-9544)
  Merged from 2.0:
+  * Remove erroneous pending HH tasks from tpstats/jmx (CASSANDRA-9129)
   * Don't cast expected bf size to an int (CASSANDRA-9959)
   * checkForEndpointCollision fails for legitimate collisions (CASSANDRA-9765)
   * Complete CASSANDRA-8448 fix (CASSANDRA-9519)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65c695c9/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
index 2b86701,2b86701..a7a54f2
--- a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
@@@ -53,6 -53,6 +53,11 @@@ public class JMXEnabledThreadPoolExecut
          this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName, priority), "internal");
      }
  
++    public JMXEnabledThreadPoolExecutor(NamedThreadFactory threadFactory, String jmxPath)
++    {
++        this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory, jmxPath);
++    }
++
      public JMXEnabledThreadPoolExecutor(int corePoolSize,
              long keepAliveTime,
              TimeUnit unit,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65c695c9/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
index e066581,0000000..3d8ebd9
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
@@@ -1,645 -1,0 +1,640 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.cql3.functions;
 +
 +import java.io.ByteArrayOutputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.lang.invoke.MethodHandle;
 +import java.lang.invoke.MethodHandles;
 +import java.lang.invoke.MethodType;
 +import java.lang.reflect.InvocationTargetException;
 +import java.net.MalformedURLException;
 +import java.net.URL;
 +import java.net.URLConnection;
 +import java.net.URLStreamHandler;
 +import java.nio.ByteBuffer;
 +import java.security.CodeSource;
 +import java.security.PermissionCollection;
 +import java.security.ProtectionDomain;
 +import java.security.SecureClassLoader;
 +import java.security.cert.Certificate;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Locale;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.StringTokenizer;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.ThreadLocalRandom;
++import java.util.concurrent.*;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import com.google.common.io.ByteStreams;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.datastax.driver.core.DataType;
- import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor;
++import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 +import org.apache.cassandra.concurrent.NamedThreadFactory;
- import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.eclipse.jdt.core.compiler.IProblem;
 +import org.eclipse.jdt.internal.compiler.*;
 +import org.eclipse.jdt.internal.compiler.Compiler;
 +import org.eclipse.jdt.internal.compiler.classfmt.ClassFileReader;
 +import org.eclipse.jdt.internal.compiler.classfmt.ClassFormatException;
 +import org.eclipse.jdt.internal.compiler.env.ICompilationUnit;
 +import org.eclipse.jdt.internal.compiler.env.INameEnvironment;
 +import org.eclipse.jdt.internal.compiler.env.NameEnvironmentAnswer;
 +import org.eclipse.jdt.internal.compiler.impl.CompilerOptions;
 +import org.eclipse.jdt.internal.compiler.problem.DefaultProblemFactory;
 +
 +final class JavaBasedUDFunction extends UDFunction
 +{
 +    private static final String BASE_PACKAGE = "org.apache.cassandra.cql3.udf.gen";
 +
 +    static final Logger logger = LoggerFactory.getLogger(JavaBasedUDFunction.class);
 +
 +    private static final AtomicInteger classSequence = new AtomicInteger();
 +
-     private static final JMXEnabledScheduledThreadPoolExecutor executor =
-     new JMXEnabledScheduledThreadPoolExecutor(
-                                              DatabaseDescriptor.getMaxHintsThread(),
-                                              new NamedThreadFactory("UserDefinedFunctions",
-                                                                     Thread.MIN_PRIORITY,
-                                                                     udfClassLoader,
-                                                                     new SecurityThreadGroup("UserDefinedFunctions", null)),
-                                              "userfunction");
++    private static final JMXEnabledThreadPoolExecutor executor =
++    new JMXEnabledThreadPoolExecutor(new NamedThreadFactory("UserDefinedFunctions",
++                                                            Thread.MIN_PRIORITY,
++                                                            udfClassLoader,
++                                                            new SecurityThreadGroup("UserDefinedFunctions", null)),
++                                     "userfunction");
 +
 +    private static final EcjTargetClassLoader targetClassLoader = new EcjTargetClassLoader();
 +
 +    private static final UDFByteCodeVerifier udfByteCodeVerifier = new UDFByteCodeVerifier();
 +
 +    private static final ProtectionDomain protectionDomain;
 +
 +    private static final IErrorHandlingPolicy errorHandlingPolicy = DefaultErrorHandlingPolicies.proceedWithAllProblems();
 +    private static final IProblemFactory problemFactory = new DefaultProblemFactory(Locale.ENGLISH);
 +    private static final CompilerOptions compilerOptions;
 +
 +    /**
 +     * Poor man's template - just a text file splitted at '#' chars.
 +     * Each string at an even index is a constant string (just copied),
 +     * each string at an odd index is an 'instruction'.
 +     */
 +    private static final String[] javaSourceTemplate;
 +
 +    static
 +    {
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/Class", "forName");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/Class", "getClassLoader");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/Class", "getResource");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/Class", "getResourceAsStream");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "clearAssertionStatus");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getResource");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getResourceAsStream");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getResources");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getSystemClassLoader");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getSystemResource");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getSystemResourceAsStream");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getSystemResources");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "loadClass");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "setClassAssertionStatus");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "setDefaultAssertionStatus");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "setPackageAssertionStatus");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/nio/ByteBuffer", "allocateDirect");
 +
 +        Map<String, String> settings = new HashMap<>();
 +        settings.put(CompilerOptions.OPTION_LineNumberAttribute,
 +                     CompilerOptions.GENERATE);
 +        settings.put(CompilerOptions.OPTION_SourceFileAttribute,
 +                     CompilerOptions.DISABLED);
 +        settings.put(CompilerOptions.OPTION_ReportDeprecation,
 +                     CompilerOptions.IGNORE);
 +        settings.put(CompilerOptions.OPTION_Source,
 +                     CompilerOptions.VERSION_1_8);
 +        settings.put(CompilerOptions.OPTION_TargetPlatform,
 +                     CompilerOptions.VERSION_1_8);
 +
 +        compilerOptions = new CompilerOptions(settings);
 +        compilerOptions.parseLiteralExpressionsAsConstants = true;
 +
 +        try (InputStream input = JavaBasedUDFunction.class.getResource("JavaSourceUDF.txt").openConnection().getInputStream())
 +        {
 +            ByteArrayOutputStream output = new ByteArrayOutputStream();
 +            FBUtilities.copy(input, output, Long.MAX_VALUE);
 +            String template = output.toString();
 +
 +            StringTokenizer st = new StringTokenizer(template, "#");
 +            javaSourceTemplate = new String[st.countTokens()];
 +            for (int i = 0; st.hasMoreElements(); i++)
 +                javaSourceTemplate[i] = st.nextToken();
 +        }
 +        catch (IOException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +
 +        CodeSource codeSource;
 +        try
 +        {
 +            codeSource = new CodeSource(new URL("udf", "localhost", 0, "/java", new URLStreamHandler()
 +            {
 +                protected URLConnection openConnection(URL u)
 +                {
 +                    return null;
 +                }
 +            }), (Certificate[])null);
 +        }
 +        catch (MalformedURLException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +
 +        protectionDomain = new ProtectionDomain(codeSource, ThreadAwareSecurityManager.noPermissions, targetClassLoader, null);
 +    }
 +
 +    private final JavaUDF javaUDF;
 +
 +    JavaBasedUDFunction(FunctionName name, List<ColumnIdentifier> argNames, List<AbstractType<?>> argTypes,
 +                        AbstractType<?> returnType, boolean calledOnNullInput, String body)
 +    {
 +        super(name, argNames, argTypes, UDHelper.driverTypes(argTypes),
 +              returnType, UDHelper.driverType(returnType), calledOnNullInput, "java", body);
 +
 +        // javaParamTypes is just the Java representation for argTypes resp. argDataTypes
 +        Class<?>[] javaParamTypes = UDHelper.javaTypes(argDataTypes, calledOnNullInput);
 +        // javaReturnType is just the Java representation for returnType resp. returnDataType
 +        Class<?> javaReturnType = returnDataType.asJavaClass();
 +
 +        // put each UDF in a separate package to prevent cross-UDF code access
 +        String pkgName = BASE_PACKAGE + '.' + generateClassName(name, 'p');
 +        String clsName = generateClassName(name, 'C');
 +
 +        String executeInternalName = generateClassName(name, 'x');
 +
 +        StringBuilder javaSourceBuilder = new StringBuilder();
 +        int lineOffset = 1;
 +        for (int i = 0; i < javaSourceTemplate.length; i++)
 +        {
 +            String s = javaSourceTemplate[i];
 +
 +            // strings at odd indexes are 'instructions'
 +            if ((i & 1) == 1)
 +            {
 +                switch (s)
 +                {
 +                    case "package_name":
 +                        s = pkgName;
 +                        break;
 +                    case "class_name":
 +                        s = clsName;
 +                        break;
 +                    case "body":
 +                        lineOffset = countNewlines(javaSourceBuilder);
 +                        s = body;
 +                        break;
 +                    case "arguments":
 +                        s = generateArguments(javaParamTypes, argNames);
 +                        break;
 +                    case "argument_list":
 +                        s = generateArgumentList(javaParamTypes, argNames);
 +                        break;
 +                    case "return_type":
 +                        s = javaSourceName(javaReturnType);
 +                        break;
 +                    case "execute_internal_name":
 +                        s = executeInternalName;
 +                        break;
 +                }
 +            }
 +
 +            javaSourceBuilder.append(s);
 +        }
 +
 +        String targetClassName = pkgName + '.' + clsName;
 +
 +        String javaSource = javaSourceBuilder.toString();
 +
 +        logger.debug("Compiling Java source UDF '{}' as class '{}' using source:\n{}", name, targetClassName, javaSource);
 +
 +        try
 +        {
 +            EcjCompilationUnit compilationUnit = new EcjCompilationUnit(javaSource, targetClassName);
 +
 +            org.eclipse.jdt.internal.compiler.Compiler compiler = new Compiler(compilationUnit,
 +                                                                               errorHandlingPolicy,
 +                                                                               compilerOptions,
 +                                                                               compilationUnit,
 +                                                                               problemFactory);
 +            compiler.compile(new ICompilationUnit[]{ compilationUnit });
 +
 +            if (compilationUnit.problemList != null && !compilationUnit.problemList.isEmpty())
 +            {
 +                boolean fullSource = false;
 +                StringBuilder problems = new StringBuilder();
 +                for (IProblem problem : compilationUnit.problemList)
 +                {
 +                    long ln = problem.getSourceLineNumber() - lineOffset;
 +                    if (ln < 1L)
 +                    {
 +                        if (problem.isError())
 +                        {
 +                            // if generated source around UDF source provided by the user is buggy,
 +                            // this code is appended.
 +                            problems.append("GENERATED SOURCE ERROR: line ")
 +                                    .append(problem.getSourceLineNumber())
 +                                    .append(" (in generated source): ")
 +                                    .append(problem.getMessage())
 +                                    .append('\n');
 +                            fullSource = true;
 +                        }
 +                    }
 +                    else
 +                    {
 +                        problems.append("Line ")
 +                                .append(Long.toString(ln))
 +                                .append(": ")
 +                                .append(problem.getMessage())
 +                                .append('\n');
 +                    }
 +                }
 +
 +                if (fullSource)
 +                    throw new InvalidRequestException("Java source compilation failed:\n" + problems + "\n generated source:\n" + javaSource);
 +                else
 +                    throw new InvalidRequestException("Java source compilation failed:\n" + problems);
 +            }
 +
 +            // Verify the UDF bytecode against use of probably dangerous code
 +            Set<String> errors = udfByteCodeVerifier.verify(targetClassLoader.classData(targetClassName));
 +            String validDeclare = "not allowed method declared: " + executeInternalName + '(';
 +            String validCall = "call to " + targetClassName.replace('.', '/') + '.' + executeInternalName + "()";
 +            for (Iterator<String> i = errors.iterator(); i.hasNext();)
 +            {
 +                String error = i.next();
 +                // we generate a random name of the private, internal execute method, which is detected by the byte-code verifier
 +                if (error.startsWith(validDeclare) || error.equals(validCall))
 +                {
 +                    i.remove();
 +                }
 +            }
 +            if (!errors.isEmpty())
 +                throw new InvalidRequestException("Java UDF validation failed: " + errors);
 +
 +            // Load the class and create a new instance of it
 +            Thread thread = Thread.currentThread();
 +            ClassLoader orig = thread.getContextClassLoader();
 +            try
 +            {
 +                thread.setContextClassLoader(UDFunction.udfClassLoader);
 +                // Execute UDF intiialization from UDF class loader
 +
 +                Class cls = Class.forName(targetClassName, false, targetClassLoader);
 +
 +                if (cls.getDeclaredMethods().length != 2 || cls.getDeclaredConstructors().length != 1)
 +                    throw new InvalidRequestException("Check your source to not define additional Java methods or constructors");
 +                MethodType methodType = MethodType.methodType(void.class)
 +                                                  .appendParameterTypes(DataType.class, DataType[].class);
 +                MethodHandle ctor = MethodHandles.lookup().findConstructor(cls, methodType);
 +                this.javaUDF = (JavaUDF) ctor.invokeWithArguments(returnDataType, argDataTypes);
 +            }
 +            finally
 +            {
 +                thread.setContextClassLoader(orig);
 +            }
 +        }
 +        catch (InvocationTargetException e)
 +        {
 +            // in case of an ITE, use the cause
 +            throw new InvalidRequestException(String.format("Could not compile function '%s' from Java source: %s", name, e.getCause()));
 +        }
 +        catch (VirtualMachineError e)
 +        {
 +            throw e;
 +        }
 +        catch (Throwable e)
 +        {
 +            throw new InvalidRequestException(String.format("Could not compile function '%s' from Java source: %s", name, e));
 +        }
 +    }
 +
 +    protected ExecutorService executor()
 +    {
 +        return executor;
 +    }
 +
 +    protected ByteBuffer executeUserDefined(int protocolVersion, List<ByteBuffer> params)
 +    {
 +        return javaUDF.executeImpl(protocolVersion, params);
 +    }
 +
 +
 +    private static int countNewlines(StringBuilder javaSource)
 +    {
 +        int ln = 0;
 +        for (int i = 0; i < javaSource.length(); i++)
 +            if (javaSource.charAt(i) == '\n')
 +                ln++;
 +        return ln;
 +    }
 +
 +    private static String generateClassName(FunctionName name, char prefix)
 +    {
 +        String qualifiedName = name.toString();
 +
 +        StringBuilder sb = new StringBuilder(qualifiedName.length() + 10);
 +        sb.append(prefix);
 +        for (int i = 0; i < qualifiedName.length(); i++)
 +        {
 +            char c = qualifiedName.charAt(i);
 +            if (Character.isJavaIdentifierPart(c))
 +                sb.append(c);
 +            else
 +                sb.append(Integer.toHexString(((short)c)&0xffff));
 +        }
 +        sb.append('_')
 +          .append(ThreadLocalRandom.current().nextInt() & 0xffffff)
 +          .append('_')
 +          .append(classSequence.incrementAndGet());
 +        return sb.toString();
 +    }
 +
 +    private static String javaSourceName(Class<?> type)
 +    {
 +        String n = type.getName();
 +        return n.startsWith("java.lang.") ? type.getSimpleName() : n;
 +    }
 +
 +    private static String generateArgumentList(Class<?>[] paramTypes, List<ColumnIdentifier> argNames)
 +    {
 +        // initial builder size can just be a guess (prevent temp object allocations)
 +        StringBuilder code = new StringBuilder(32 * paramTypes.length);
 +        for (int i = 0; i < paramTypes.length; i++)
 +        {
 +            if (i > 0)
 +                code.append(", ");
 +            code.append(javaSourceName(paramTypes[i]))
 +                .append(' ')
 +                .append(argNames.get(i));
 +        }
 +        return code.toString();
 +    }
 +
 +    private static String generateArguments(Class<?>[] paramTypes, List<ColumnIdentifier> argNames)
 +    {
 +        StringBuilder code = new StringBuilder(64 * paramTypes.length);
 +        for (int i = 0; i < paramTypes.length; i++)
 +        {
 +            if (i > 0)
 +                code.append(",\n");
 +
 +            if (logger.isDebugEnabled())
 +                code.append("            /* parameter '").append(argNames.get(i)).append("' */\n");
 +
 +            code
 +                // cast to Java type
 +                .append("            (").append(javaSourceName(paramTypes[i])).append(") ")
 +                // generate object representation of input parameter (call UDFunction.compose)
 +                .append(composeMethod(paramTypes[i])).append("(protocolVersion, ").append(i).append(", params.get(").append(i).append("))");
 +        }
 +        return code.toString();
 +    }
 +
 +    private static String composeMethod(Class<?> type)
 +    {
 +        return (type.isPrimitive()) ? ("super.compose_" + type.getName()) : "super.compose";
 +    }
 +
 +    // Java source UDFs are a very simple compilation task, which allows us to let one class implement
 +    // all interfaces required by ECJ.
 +    static final class EcjCompilationUnit implements ICompilationUnit, ICompilerRequestor, INameEnvironment
 +    {
 +        List<IProblem> problemList;
 +        private final String className;
 +        private final char[] sourceCode;
 +
 +        EcjCompilationUnit(String sourceCode, String className)
 +        {
 +            this.className = className;
 +            this.sourceCode = sourceCode.toCharArray();
 +        }
 +
 +        // ICompilationUnit
 +
 +        @Override
 +        public char[] getFileName()
 +        {
 +            return sourceCode;
 +        }
 +
 +        @Override
 +        public char[] getContents()
 +        {
 +            return sourceCode;
 +        }
 +
 +        @Override
 +        public char[] getMainTypeName()
 +        {
 +            int dot = className.lastIndexOf('.');
 +            return ((dot > 0) ? className.substring(dot + 1) : className).toCharArray();
 +        }
 +
 +        @Override
 +        public char[][] getPackageName()
 +        {
 +            StringTokenizer izer = new StringTokenizer(className, ".");
 +            char[][] result = new char[izer.countTokens() - 1][];
 +            for (int i = 0; i < result.length; i++)
 +                result[i] = izer.nextToken().toCharArray();
 +            return result;
 +        }
 +
 +        @Override
 +        public boolean ignoreOptionalProblems()
 +        {
 +            return false;
 +        }
 +
 +        // ICompilerRequestor
 +
 +        @Override
 +        public void acceptResult(CompilationResult result)
 +        {
 +            if (result.hasErrors())
 +            {
 +                IProblem[] problems = result.getProblems();
 +                if (problemList == null)
 +                    problemList = new ArrayList<>(problems.length);
 +                Collections.addAll(problemList, problems);
 +            }
 +            else
 +            {
 +                ClassFile[] classFiles = result.getClassFiles();
 +                for (ClassFile classFile : classFiles)
 +                    targetClassLoader.addClass(className, classFile.getBytes());
 +            }
 +        }
 +
 +        // INameEnvironment
 +
 +        @Override
 +        public NameEnvironmentAnswer findType(char[][] compoundTypeName)
 +        {
 +            StringBuilder result = new StringBuilder();
 +            for (int i = 0; i < compoundTypeName.length; i++)
 +            {
 +                if (i > 0)
 +                    result.append('.');
 +                result.append(compoundTypeName[i]);
 +            }
 +            return findType(result.toString());
 +        }
 +
 +        @Override
 +        public NameEnvironmentAnswer findType(char[] typeName, char[][] packageName)
 +        {
 +            StringBuilder result = new StringBuilder();
 +            int i = 0;
 +            for (; i < packageName.length; i++)
 +            {
 +                if (i > 0)
 +                    result.append('.');
 +                result.append(packageName[i]);
 +            }
 +            if (i > 0)
 +                result.append('.');
 +            result.append(typeName);
 +            return findType(result.toString());
 +        }
 +
 +        private NameEnvironmentAnswer findType(String className)
 +        {
 +            if (className.equals(this.className))
 +            {
 +                return new NameEnvironmentAnswer(this, null);
 +            }
 +
 +            String resourceName = className.replace('.', '/') + ".class";
 +
 +            try (InputStream is = UDFunction.udfClassLoader.getResourceAsStream(resourceName))
 +            {
 +                if (is != null)
 +                {
 +                    byte[] classBytes = ByteStreams.toByteArray(is);
 +                    char[] fileName = className.toCharArray();
 +                    ClassFileReader classFileReader = new ClassFileReader(classBytes, fileName, true);
 +                    return new NameEnvironmentAnswer(classFileReader, null);
 +                }
 +            }
 +            catch (IOException | ClassFormatException exc)
 +            {
 +                throw new RuntimeException(exc);
 +            }
 +            return null;
 +        }
 +
 +        private boolean isPackage(String result)
 +        {
 +            if (result.equals(this.className))
 +                return false;
 +            String resourceName = result.replace('.', '/') + ".class";
 +            try (InputStream is = UDFunction.udfClassLoader.getResourceAsStream(resourceName))
 +            {
 +                return is == null;
 +            }
 +            catch (IOException e)
 +            {
 +                // we are here, since close on is failed. That means it was not null
 +                return false;
 +            }
 +        }
 +
 +        @Override
 +        public boolean isPackage(char[][] parentPackageName, char[] packageName)
 +        {
 +            StringBuilder result = new StringBuilder();
 +            int i = 0;
 +            if (parentPackageName != null)
 +                for (; i < parentPackageName.length; i++)
 +                {
 +                    if (i > 0)
 +                        result.append('.');
 +                    result.append(parentPackageName[i]);
 +                }
 +
 +            if (Character.isUpperCase(packageName[0]) && !isPackage(result.toString()))
 +                return false;
 +            if (i > 0)
 +                result.append('.');
 +            result.append(packageName);
 +
 +            return isPackage(result.toString());
 +        }
 +
 +        @Override
 +        public void cleanup()
 +        {
 +        }
 +    }
 +
 +    static final class EcjTargetClassLoader extends SecureClassLoader
 +    {
 +        EcjTargetClassLoader()
 +        {
 +            super(UDFunction.udfClassLoader);
 +        }
 +
 +        // This map is usually empty.
 +        // It only contains data *during* UDF compilation but not during runtime.
 +        //
 +        // addClass() is invoked by ECJ after successful compilation of the generated Java source.
 +        // loadClass(targetClassName) is invoked by buildUDF() after ECJ returned from successful compilation.
 +        //
 +        private final Map<String, byte[]> classes = new ConcurrentHashMap<>();
 +
 +        void addClass(String className, byte[] classData)
 +        {
 +            classes.put(className, classData);
 +        }
 +
 +        byte[] classData(String className)
 +        {
 +            return classes.get(className);
 +        }
 +
 +        protected Class<?> findClass(String name) throws ClassNotFoundException
 +        {
 +            // remove the class binary - it's only used once - so it's wasting heap
 +            byte[] classData = classes.remove(name);
 +
 +            if (classData != null)
 +                return defineClass(name, classData, 0, classData.length, protectionDomain);
 +
 +            return getParent().loadClass(name);
 +        }
 +
 +        protected PermissionCollection getPermissions(CodeSource codesource)
 +        {
 +            return ThreadAwareSecurityManager.noPermissions;
 +        }
 +    }}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65c695c9/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java
index d79960f,0000000..8b448fe
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java
@@@ -1,265 -1,0 +1,262 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.functions;
 +
 +import java.math.BigDecimal;
 +import java.math.BigInteger;
 +import java.net.MalformedURLException;
 +import java.net.URL;
 +import java.net.URLConnection;
 +import java.net.URLStreamHandler;
 +import java.nio.ByteBuffer;
 +import java.security.AccessControlContext;
 +import java.security.AccessController;
 +import java.security.CodeSource;
 +import java.security.PrivilegedActionException;
 +import java.security.PrivilegedExceptionAction;
 +import java.security.ProtectionDomain;
 +import java.security.cert.Certificate;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.concurrent.ExecutorService;
 +import javax.script.Bindings;
 +import javax.script.Compilable;
 +import javax.script.CompiledScript;
 +import javax.script.ScriptContext;
 +import javax.script.ScriptEngine;
 +import javax.script.ScriptEngineFactory;
 +import javax.script.ScriptEngineManager;
 +import javax.script.ScriptException;
 +import javax.script.SimpleScriptContext;
 +
- import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor;
++import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 +import org.apache.cassandra.concurrent.NamedThreadFactory;
- import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +
 +final class ScriptBasedUDFunction extends UDFunction
 +{
 +    static final Map<String, Compilable> scriptEngines = new HashMap<>();
 +
 +    private static final ProtectionDomain protectionDomain;
 +    private static final AccessControlContext accessControlContext;
 +
 +    //
 +    // For scripted UDFs we have to rely on the security mechanisms of the scripting engine and
 +    // SecurityManager - especially SecurityManager.checkPackageAccess(). Unlike Java-UDFs, strict checking
 +    // of class access via the UDF class loader is not possible, since e.g. Nashorn builds its own class loader
 +    // (jdk.nashorn.internal.runtime.ScriptLoader / jdk.nashorn.internal.runtime.NashornLoader) configured with
 +    // a system class loader.
 +    //
 +    private static final String[] allowedPackagesArray =
 +    {
 +    // following required by jdk.nashorn.internal.objects.Global.initJavaAccess()
 +    "",
 +    "com",
 +    "edu",
 +    "java",
 +    "javax",
 +    "javafx",
 +    "org",
 +    // following required by Nashorn runtime
 +    "java.lang",
 +    "java.lang.invoke",
 +    "java.lang.reflect",
 +    "java.nio.charset",
 +    "java.util",
 +    "java.util.concurrent",
 +    "javax.script",
 +    "sun.reflect",
 +    "jdk.internal.org.objectweb.asm.commons",
 +    "jdk.nashorn.internal.runtime",
 +    "jdk.nashorn.internal.runtime.linker",
 +    // following required by Java Driver
 +    "java.math",
 +    "java.text",
 +    "com.google.common.base",
 +    "com.google.common.reflect",
 +    // following required by UDF
 +    "com.datastax.driver.core",
 +    "com.datastax.driver.core.utils"
 +    };
 +
-     private static final JMXEnabledScheduledThreadPoolExecutor executor =
-     new JMXEnabledScheduledThreadPoolExecutor(
-                                              DatabaseDescriptor.getMaxHintsThread(),
-                                              new NamedThreadFactory("UserDefinedScriptFunctions",
-                                                                     Thread.MIN_PRIORITY,
-                                                                     udfClassLoader,
-                                                                     new SecurityThreadGroup("UserDefinedScriptFunctions", Collections.unmodifiableSet(new HashSet<>(Arrays.asList(allowedPackagesArray))))),
-                                              "userscripts");
++    private static final JMXEnabledThreadPoolExecutor executor =
++    new JMXEnabledThreadPoolExecutor(new NamedThreadFactory("UserDefinedScriptFunctions",
++                                                            Thread.MIN_PRIORITY,
++                                                            udfClassLoader,
++                                                            new SecurityThreadGroup("UserDefinedScriptFunctions", Collections.unmodifiableSet(new HashSet<>(Arrays.asList(allowedPackagesArray))))),
++                                     "userscripts");
 +
 +    static
 +    {
 +        ScriptEngineManager scriptEngineManager = new ScriptEngineManager();
 +        for (ScriptEngineFactory scriptEngineFactory : scriptEngineManager.getEngineFactories())
 +        {
 +            ScriptEngine scriptEngine = scriptEngineFactory.getScriptEngine();
 +            boolean compilable = scriptEngine instanceof Compilable;
 +            if (compilable)
 +            {
 +                logger.info("Found scripting engine {} {} - {} {} - language names: {}",
 +                            scriptEngineFactory.getEngineName(), scriptEngineFactory.getEngineVersion(),
 +                            scriptEngineFactory.getLanguageName(), scriptEngineFactory.getLanguageVersion(),
 +                            scriptEngineFactory.getNames());
 +                for (String name : scriptEngineFactory.getNames())
 +                    scriptEngines.put(name, (Compilable) scriptEngine);
 +            }
 +        }
 +
 +        try
 +        {
 +            protectionDomain = new ProtectionDomain(new CodeSource(new URL("udf", "localhost", 0, "/script", new URLStreamHandler()
 +            {
 +                protected URLConnection openConnection(URL u)
 +                {
 +                    return null;
 +                }
 +            }), (Certificate[]) null), ThreadAwareSecurityManager.noPermissions);
 +        }
 +        catch (MalformedURLException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +        accessControlContext = new AccessControlContext(new ProtectionDomain[]{ protectionDomain });
 +    }
 +
 +    private final CompiledScript script;
 +
 +    ScriptBasedUDFunction(FunctionName name,
 +                          List<ColumnIdentifier> argNames,
 +                          List<AbstractType<?>> argTypes,
 +                          AbstractType<?> returnType,
 +                          boolean calledOnNullInput,
 +                          String language,
 +                          String body)
 +    {
 +        super(name, argNames, argTypes, returnType, calledOnNullInput, language, body);
 +
 +        Compilable scriptEngine = scriptEngines.get(language);
 +        if (scriptEngine == null)
 +            throw new InvalidRequestException(String.format("Invalid language '%s' for function '%s'", language, name));
 +
 +        // execute compilation with no-permissions to prevent evil code e.g. via "static code blocks" / "class initialization"
 +        try
 +        {
 +            this.script = AccessController.doPrivileged((PrivilegedExceptionAction<CompiledScript>) () -> scriptEngine.compile(body),
 +                                                        accessControlContext);
 +        }
 +        catch (PrivilegedActionException x)
 +        {
 +            Throwable e = x.getCause();
 +            logger.info("Failed to compile function '{}' for language {}: ", name, language, e);
 +            throw new InvalidRequestException(
 +                                             String.format("Failed to compile function '%s' for language %s: %s", name, language, e));
 +        }
 +    }
 +
 +    protected ExecutorService executor()
 +    {
 +        return executor;
 +    }
 +
 +    public ByteBuffer executeUserDefined(int protocolVersion, List<ByteBuffer> parameters)
 +    {
 +        Object[] params = new Object[argTypes.size()];
 +        for (int i = 0; i < params.length; i++)
 +            params[i] = compose(protocolVersion, i, parameters.get(i));
 +
 +        ScriptContext scriptContext = new SimpleScriptContext();
 +        scriptContext.setAttribute("javax.script.filename", this.name.toString(), ScriptContext.ENGINE_SCOPE);
 +        Bindings bindings = scriptContext.getBindings(ScriptContext.ENGINE_SCOPE);
 +        for (int i = 0; i < params.length; i++)
 +            bindings.put(argNames.get(i).toString(), params[i]);
 +
 +        Object result;
 +        try
 +        {
 +            // How to prevent Class.forName() _without_ "help" from the script engine ?
 +            // NOTE: Nashorn enforces a special permission to allow class-loading, which is not granted - so it's fine.
 +
 +            result = script.eval(scriptContext);
 +        }
 +        catch (ScriptException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +        if (result == null)
 +            return null;
 +
 +        Class<?> javaReturnType = returnDataType.asJavaClass();
 +        Class<?> resultType = result.getClass();
 +        if (!javaReturnType.isAssignableFrom(resultType))
 +        {
 +            if (result instanceof Number)
 +            {
 +                Number rNumber = (Number) result;
 +                if (javaReturnType == Integer.class)
 +                    result = rNumber.intValue();
 +                else if (javaReturnType == Long.class)
 +                    result = rNumber.longValue();
 +                else if (javaReturnType == Short.class)
 +                    result = rNumber.shortValue();
 +                else if (javaReturnType == Byte.class)
 +                    result = rNumber.byteValue();
 +                else if (javaReturnType == Float.class)
 +                    result = rNumber.floatValue();
 +                else if (javaReturnType == Double.class)
 +                    result = rNumber.doubleValue();
 +                else if (javaReturnType == BigInteger.class)
 +                {
 +                    if (javaReturnType == Integer.class)
 +                        result = rNumber.intValue();
 +                    else if (javaReturnType == Short.class)
 +                        result = rNumber.shortValue();
 +                    else if (javaReturnType == Byte.class)
 +                        result = rNumber.byteValue();
 +                    else if (javaReturnType == Long.class)
 +                        result = rNumber.longValue();
 +                    else if (javaReturnType == Float.class)
 +                        result = rNumber.floatValue();
 +                    else if (javaReturnType == Double.class)
 +                        result = rNumber.doubleValue();
 +                    else if (javaReturnType == BigInteger.class)
 +                    {
 +                        if (rNumber instanceof BigDecimal)
 +                            result = ((BigDecimal) rNumber).toBigInteger();
 +                        else if (rNumber instanceof Double || rNumber instanceof Float)
 +                            result = new BigDecimal(rNumber.toString()).toBigInteger();
 +                        else
 +                            result = BigInteger.valueOf(rNumber.longValue());
 +                    }
 +                    else if (javaReturnType == BigDecimal.class)
 +                        // String c'tor of BigDecimal is more accurate than valueOf(double)
 +                        result = new BigDecimal(rNumber.toString());
 +                }
 +                else if (javaReturnType == BigDecimal.class)
 +                    // String c'tor of BigDecimal is more accurate than valueOf(double)
 +                    result = new BigDecimal(rNumber.toString());
 +            }
 +        }
 +
 +        return decompose(protocolVersion, result);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65c695c9/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 17832d7,dae85b7..4656d41
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@@ -36,13 -37,15 +36,15 @@@ import com.google.common.util.concurren
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
- import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor;
+ import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+ import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
  import org.apache.cassandra.concurrent.NamedThreadFactory;
 +import org.apache.cassandra.config.ColumnDefinition;
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.compaction.CompactionManager;
 -import org.apache.cassandra.db.composites.CellName;
 -import org.apache.cassandra.db.composites.Composite;
 -import org.apache.cassandra.db.composites.Composites;
++
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.db.partitions.*;
  import org.apache.cassandra.db.filter.*;
  import org.apache.cassandra.db.marshal.Int32Type;
  import org.apache.cassandra.db.marshal.UUIDType;
@@@ -57,9 -62,10 +59,11 @@@ import org.apache.cassandra.io.util.Dat
  import org.apache.cassandra.metrics.HintedHandoffMetrics;
  import org.apache.cassandra.net.MessageOut;
  import org.apache.cassandra.net.MessagingService;
- import org.apache.cassandra.service.*;
+ import org.apache.cassandra.service.StorageProxy;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.service.WriteResponseHandler;
  import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
  import org.cliffc.high_scale_lib.NonBlockingHashSet;
  
  /**