You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2016/10/18 23:45:28 UTC
[08/11] drill git commit: DRILL-4726: Dynamic UDF Support
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/JarScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/JarScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/JarScan.java
new file mode 100644
index 0000000..4ebb3e2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/JarScan.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.fn.registry;
+
+import org.apache.drill.common.scanner.persistence.ScanResult;
+
+/**
+ * Holder class that contains:
+ * <ol>
+ * <li>jar name</li>
+ * <li>scan of packages, classes, annotations found in jar</li>
+ * <li>unique jar classLoader</li>
+ * </ol>
+ */
+public class JarScan {
+
+ private final String jarName;
+ private final ScanResult scanResult;
+ private final ClassLoader classLoader;
+
+ public JarScan(String jarName, ScanResult scanResult, ClassLoader classLoader) {
+ this.jarName = jarName;
+ this.scanResult = scanResult;
+ this.classLoader = classLoader;
+ }
+
+ public String getJarName() {
+ return jarName;
+ }
+
+ public ClassLoader getClassLoader() {
+ return classLoader;
+ }
+
+ public ScanResult getScanResult() {
+ return scanResult;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
new file mode 100644
index 0000000..03fd608
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.fn.registry;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.common.scanner.persistence.AnnotatedClassDescriptor;
+import org.apache.drill.common.scanner.persistence.ScanResult;
+import org.apache.drill.exec.exception.FunctionValidationException;
+import org.apache.drill.exec.exception.JarValidationException;
+import org.apache.drill.exec.expr.fn.DrillFuncHolder;
+import org.apache.drill.exec.expr.fn.FunctionConverter;
+import org.apache.drill.exec.planner.logical.DrillConstExecutor;
+import org.apache.drill.exec.planner.sql.DrillOperatorTable;
+import org.apache.drill.exec.planner.sql.DrillSqlAggOperator;
+import org.apache.drill.exec.planner.sql.DrillSqlAggOperatorWithoutInference;
+import org.apache.drill.exec.planner.sql.DrillSqlOperator;
+
+import com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.exec.planner.sql.DrillSqlOperatorWithoutInference;
+
+/**
+ * Registry of Drill functions.
+ */
+public class LocalFunctionRegistry {
+
+ public static final String BUILT_IN = "built-in";
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalFunctionRegistry.class);
+ private static final String functionSignaturePattern = "%s(%s)";
+
+ private static final ImmutableMap<String, Pair<Integer, Integer>> registeredFuncNameToArgRange = ImmutableMap.<String, Pair<Integer, Integer>> builder()
+ // CONCAT is allowed to take [1, infinity) number of arguments.
+ // Currently, this flexibility is offered by DrillOptiq to rewrite it as
+ // a nested structure
+ .put("CONCAT", Pair.of(1, Integer.MAX_VALUE))
+
+ // When LENGTH is given two arguments, this function relies on DrillOptiq to rewrite it as
+ // another function based on the second argument (encodingType)
+ .put("LENGTH", Pair.of(1, 2))
+
+ // Dummy functions
+ .put("CONVERT_TO", Pair.of(2, 2))
+ .put("CONVERT_FROM", Pair.of(2, 2))
+ .put("FLATTEN", Pair.of(1, 1)).build();
+
+ private final FunctionRegistryHolder registryHolder;
+
+ /** Registers all functions present in Drill classpath on start-up. All functions will be marked as built-in.
+ * Built-in functions are not allowed to be unregistered. */
+ public LocalFunctionRegistry(ScanResult classpathScan) {
+ registryHolder = new FunctionRegistryHolder();
+ validate(BUILT_IN, classpathScan);
+ register(Lists.newArrayList(new JarScan(BUILT_IN, classpathScan, this.getClass().getClassLoader())));
+ if (logger.isTraceEnabled()) {
+ StringBuilder allFunctions = new StringBuilder();
+ for (DrillFuncHolder method: registryHolder.getAllFunctionsWithHolders().values()) {
+ allFunctions.append(method.toString()).append("\n");
+ }
+ logger.trace("Registered functions: [\n{}]", allFunctions);
+ }
+ }
+
+ /**
+ * @return local function registry version number
+ */
+ public long getVersion() {
+ return registryHolder.getVersion();
+ }
+
+ /**
+ * Validates all functions, present in jars.
+ * Will throw {@link FunctionValidationException} if:
+ * <ol>
+ * <li>Jar with the same name has been already registered.</li>
+ * <li>Conflicting function with the similar signature is found.</li>
+ * <li>Aggregating function is not deterministic.</li>
+ *</ol>
+ * @param jarName jar name to be validated
+ * @param scanResult scan of all classes present in jar
+ * @return list of validated function signatures
+ */
+ public List<String> validate(String jarName, ScanResult scanResult) {
+ List<String> functions = Lists.newArrayList();
+ FunctionConverter converter = new FunctionConverter();
+ List<AnnotatedClassDescriptor> providerClasses = scanResult.getAnnotatedClasses();
+
+ if (registryHolder.containsJar(jarName)) {
+ throw new JarValidationException(String.format("Jar with %s name has been already registered", jarName));
+ }
+
+ final ListMultimap<String, String> allFuncWithSignatures = registryHolder.getAllFunctionsWithSignatures();
+
+ for (AnnotatedClassDescriptor func : providerClasses) {
+ DrillFuncHolder holder = converter.getHolder(func, ClassLoader.getSystemClassLoader());
+ if (holder != null) {
+ String functionInput = holder.getInputParameters();
+
+ String[] names = holder.getRegisteredNames();
+ for (String name : names) {
+ String functionName = name.toLowerCase();
+ String functionSignature = String.format(functionSignaturePattern, functionName, functionInput);
+
+ if (allFuncWithSignatures.get(functionName).contains(functionSignature)) {
+ throw new FunctionValidationException(String.format("Found duplicated function in %s: %s",
+ registryHolder.getJarNameByFunctionSignature(functionName, functionSignature), functionSignature));
+ } else if (holder.isAggregating() && !holder.isDeterministic()) {
+ throw new FunctionValidationException(
+ String.format("Aggregate functions must be deterministic: %s", func.getClassName()));
+ } else {
+ functions.add(functionSignature);
+ allFuncWithSignatures.put(functionName, functionSignature);
+ }
+ }
+ } else {
+ logger.warn("Unable to initialize function for class {}", func.getClassName());
+ }
+ }
+ return functions;
+ }
+
+ /**
+ * Registers all functions present in jar.
+ * If jar name is already registered, all jar related functions will be overridden.
+ * To prevent classpath collisions during loading and unloading jars,
+ * each jar is shipped with its own class loader.
+ *
+ * @param jars list of jars to be registered
+ */
+ public void register(List<JarScan> jars) {
+ Map<String, List<FunctionHolder>> newJars = Maps.newHashMap();
+ for (JarScan jarScan : jars) {
+ FunctionConverter converter = new FunctionConverter();
+ List<AnnotatedClassDescriptor> providerClasses = jarScan.getScanResult().getAnnotatedClasses();
+ List<FunctionHolder> functions = Lists.newArrayList();
+ newJars.put(jarScan.getJarName(), functions);
+ for (AnnotatedClassDescriptor func : providerClasses) {
+ DrillFuncHolder holder = converter.getHolder(func, jarScan.getClassLoader());
+ if (holder != null) {
+ String functionInput = holder.getInputParameters();
+ String[] names = holder.getRegisteredNames();
+ for (String name : names) {
+ String functionName = name.toLowerCase();
+ String functionSignature = String.format(functionSignaturePattern, functionName, functionInput);
+ functions.add(new FunctionHolder(functionName, functionSignature, holder));
+ }
+ }
+ }
+ }
+ registryHolder.addJars(newJars);
+ }
+
+ /**
+ * Removes all function associated with the given jar name.
+ * Functions marked as built-in is not allowed to be unregistered.
+ * If user attempts to unregister built-in functions, logs warning and does nothing.
+ * Jar name is case-sensitive.
+ *
+ * @param jarName jar name to be unregistered
+ */
+ public void unregister(String jarName) {
+ if (BUILT_IN.equals(jarName)) {
+ logger.warn("Functions marked as built-in are not allowed to be unregistered.");
+ return;
+ }
+ registryHolder.removeJar(jarName);
+ }
+
+ /**
+ * Returns list of jar names registered in function registry.
+ *
+ * @return list of jar names
+ */
+ public List<String> getAllJarNames() {
+ return registryHolder.getAllJarNames();
+ }
+
+ /**
+ * @return quantity of all registered functions
+ */
+ public int size(){
+ return registryHolder.functionsSize();
+ }
+
+ /**
+ * @param name function name
+ * @return all function holders associated with the function name. Function name is case insensitive.
+ */
+ public List<DrillFuncHolder> getMethods(String name, AtomicLong version) {
+ return registryHolder.getHoldersByFunctionName(name.toLowerCase(), version);
+ }
+
+ public List<DrillFuncHolder> getMethods(String name) {
+ return registryHolder.getHoldersByFunctionName(name.toLowerCase());
+ }
+
+ /**
+ * Registers all functions present in {@link DrillOperatorTable},
+ * also sets local registry version used at the moment of registering.
+ *
+ * @param operatorTable drill operator table
+ */
+ public void register(DrillOperatorTable operatorTable) {
+ AtomicLong versionHolder = new AtomicLong();
+ final Map<String, Collection<DrillFuncHolder>> registeredFunctions = registryHolder.getAllFunctionsWithHolders(versionHolder).asMap();
+ operatorTable.setFunctionRegistryVersion(versionHolder.get());
+ registerOperatorsWithInference(operatorTable, registeredFunctions);
+ registerOperatorsWithoutInference(operatorTable, registeredFunctions);
+ }
+
+ private void registerOperatorsWithInference(DrillOperatorTable operatorTable, Map<String, Collection<DrillFuncHolder>> registeredFunctions) {
+ final Map<String, DrillSqlOperator.DrillSqlOperatorBuilder> map = Maps.newHashMap();
+ final Map<String, DrillSqlAggOperator.DrillSqlAggOperatorBuilder> mapAgg = Maps.newHashMap();
+ for (Entry<String, Collection<DrillFuncHolder>> function : registeredFunctions.entrySet()) {
+ final ArrayListMultimap<Pair<Integer, Integer>, DrillFuncHolder> functions = ArrayListMultimap.create();
+ final ArrayListMultimap<Integer, DrillFuncHolder> aggregateFunctions = ArrayListMultimap.create();
+ final String name = function.getKey().toUpperCase();
+ boolean isDeterministic = true;
+ for (DrillFuncHolder func : function.getValue()) {
+ final int paramCount = func.getParamCount();
+ if(func.isAggregating()) {
+ aggregateFunctions.put(paramCount, func);
+ } else {
+ final Pair<Integer, Integer> argNumberRange;
+ if(registeredFuncNameToArgRange.containsKey(name)) {
+ argNumberRange = registeredFuncNameToArgRange.get(name);
+ } else {
+ argNumberRange = Pair.of(func.getParamCount(), func.getParamCount());
+ }
+ functions.put(argNumberRange, func);
+ }
+
+ if(!func.isDeterministic()) {
+ isDeterministic = false;
+ }
+ }
+ for (Entry<Pair<Integer, Integer>, Collection<DrillFuncHolder>> entry : functions.asMap().entrySet()) {
+ final Pair<Integer, Integer> range = entry.getKey();
+ final int max = range.getRight();
+ final int min = range.getLeft();
+ if(!map.containsKey(name)) {
+ map.put(name, new DrillSqlOperator.DrillSqlOperatorBuilder()
+ .setName(name));
+ }
+
+ final DrillSqlOperator.DrillSqlOperatorBuilder drillSqlOperatorBuilder = map.get(name);
+ drillSqlOperatorBuilder
+ .addFunctions(entry.getValue())
+ .setArgumentCount(min, max)
+ .setDeterministic(isDeterministic);
+ }
+ for (Entry<Integer, Collection<DrillFuncHolder>> entry : aggregateFunctions.asMap().entrySet()) {
+ if(!mapAgg.containsKey(name)) {
+ mapAgg.put(name, new DrillSqlAggOperator.DrillSqlAggOperatorBuilder().setName(name));
+ }
+
+ final DrillSqlAggOperator.DrillSqlAggOperatorBuilder drillSqlAggOperatorBuilder = mapAgg.get(name);
+ drillSqlAggOperatorBuilder
+ .addFunctions(entry.getValue())
+ .setArgumentCount(entry.getKey(), entry.getKey());
+ }
+ }
+
+ for(final Entry<String, DrillSqlOperator.DrillSqlOperatorBuilder> entry : map.entrySet()) {
+ operatorTable.addOperatorWithInference(
+ entry.getKey(),
+ entry.getValue().build());
+ }
+
+ for(final Entry<String, DrillSqlAggOperator.DrillSqlAggOperatorBuilder> entry : mapAgg.entrySet()) {
+ operatorTable.addOperatorWithInference(
+ entry.getKey(),
+ entry.getValue().build());
+ }
+ }
+
+ private void registerOperatorsWithoutInference(DrillOperatorTable operatorTable, Map<String, Collection<DrillFuncHolder>> registeredFunctions) {
+ SqlOperator op;
+ for (Entry<String, Collection<DrillFuncHolder>> function : registeredFunctions.entrySet()) {
+ Set<Integer> argCounts = Sets.newHashSet();
+ String name = function.getKey().toUpperCase();
+ for (DrillFuncHolder func : function.getValue()) {
+ if (argCounts.add(func.getParamCount())) {
+ if (func.isAggregating()) {
+ op = new DrillSqlAggOperatorWithoutInference(name, func.getParamCount());
+ } else {
+ boolean isDeterministic;
+ // prevent Drill from folding constant functions with types that cannot be materialized
+ // into literals
+ if (DrillConstExecutor.NON_REDUCIBLE_TYPES.contains(func.getReturnType().getMinorType())) {
+ isDeterministic = false;
+ } else {
+ isDeterministic = func.isDeterministic();
+ }
+ op = new DrillSqlOperatorWithoutInference(name, func.getParamCount(), func.getReturnType(), isDeterministic);
+ }
+ operatorTable.addOperatorWithoutInference(function.getKey(), op);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
new file mode 100644
index 0000000..4ce4a19
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.fn.registry;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.store.TransientStore;
+import org.apache.drill.exec.coord.store.TransientStoreConfig;
+import org.apache.drill.exec.coord.store.TransientStoreListener;
+import org.apache.drill.exec.exception.StoreException;
+import org.apache.drill.exec.exception.VersionMismatchException;
+import org.apache.drill.exec.proto.SchemaUserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.Registry;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.PersistentStoreProvider;
+import org.apache.drill.exec.store.sys.store.DataChangeVersion;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.File;
+import java.io.IOException;
+
+import static com.fasterxml.jackson.databind.SerializationFeature.INDENT_OUTPUT;
+
+/**
+ * Is responsible for remote function registry management.
+ * Creates all remote registry areas at startup and validates them,
+ * during init establishes connections with three udf related stores.
+ * Provides tools to work with three udf related stores, gives access to remote registry areas.
+ *
+ * There are three udf stores:
+ * REGISTRY - persistent store, stores remote function registry {@link Registry} under udf path
+ * which contains information about all dynamically registered jars and their function signatures.
+ * If connection is created for the first time, puts empty remote registry.
+ *
+ * UNREGISTRATION - transient store, stores information under udf/unregister path.
+ * udf/unregister path is persistent by itself but any child created will be transient.
+ * Whenever user submits request to unregister jar, child path with jar name is created under this store.
+ * This store also holds unregistration listener, which notifies all drill bits when child path is created,
+ * so they can start local unregistration process.
+ *
+ * JARS - transient store, stores information under udf/jars path.
+ * udf/jars path is persistent by itself but any child created will be transient.
+ * Servers as lock, not allowing to perform any action on the same time.
+ * There two types of actions: {@link Action#REGISTRATION} and {@link Action#UNREGISTRATION}.
+ * Before starting any action, users tries to create child path with jar name under this store
+ * and if such path already exists, receives action being performed on that very jar.
+ * When user finishes its action, he deletes child path with jar name.
+ *
+ * There are three udf areas:
+ * STAGING - area where user copies binary and source jars before starting registration process.
+ * REGISTRY - area where registered jars are stored.
+ * TMP - area where source and binary jars are backed up in unique folder during registration process.
+ */
+public class RemoteFunctionRegistry implements AutoCloseable {
+
+ public static final String REGISTRY = "registry";
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteFunctionRegistry.class);
+ private static final ObjectMapper mapper = new ObjectMapper().enable(INDENT_OUTPUT);
+
+ private final TransientStoreListener unregistrationListener;
+ private int retryAttempts;
+ private FileSystem fs;
+ private Path registryArea;
+ private Path stagingArea;
+ private Path tmpArea;
+
+ private PersistentStore<Registry> registry;
+ private TransientStore<String> unregistration;
+ private TransientStore<String> jars;
+
+ public RemoteFunctionRegistry(TransientStoreListener unregistrationListener) {
+ this.unregistrationListener = unregistrationListener;
+ }
+
+ public void init(DrillConfig config, PersistentStoreProvider storeProvider, ClusterCoordinator coordinator) {
+ prepareStores(storeProvider, coordinator);
+ prepareAreas(config);
+ this.retryAttempts = config.getInt(ExecConstants.UDF_RETRY_ATTEMPTS);
+ }
+
+ public Registry getRegistry() {
+ return registry.get(REGISTRY, null);
+ }
+
+ public Registry getRegistry(DataChangeVersion version) {
+ return registry.get(REGISTRY, version);
+ }
+
+ public void updateRegistry(Registry registryContent, DataChangeVersion version) throws VersionMismatchException {
+ registry.put(REGISTRY, registryContent, version);
+ }
+
+ public void submitForUnregistration(String jar) {
+ unregistration.putIfAbsent(jar, jar);
+ }
+
+ public void finishUnregistration(String jar) {
+ unregistration.remove(jar);
+ }
+
+ public String addToJars(String jar, Action action) {
+ return jars.putIfAbsent(jar, action.toString());
+ }
+
+ public void removeFromJars(String jar) {
+ jars.remove(jar);
+ }
+
+ public int getRetryAttempts() {
+ return retryAttempts;
+ }
+
+ public FileSystem getFs() {
+ return fs;
+ }
+
+ public Path getRegistryArea() {
+ return registryArea;
+ }
+
+ public Path getStagingArea() {
+ return stagingArea;
+ }
+
+ public Path getTmpArea() {
+ return tmpArea;
+ }
+
+ /**
+ * Connects to three stores: REGISTRY, UNREGISTRATION, JARS.
+ * Puts in REGISTRY store with default instance of remote function registry if store is initiated for the first time.
+ * Registers unregistration listener in UNREGISTRATION store.
+ */
+ private void prepareStores(PersistentStoreProvider storeProvider, ClusterCoordinator coordinator) {
+ try {
+ PersistentStoreConfig<Registry> registrationConfig = PersistentStoreConfig
+ .newProtoBuilder(SchemaUserBitShared.Registry.WRITE, SchemaUserBitShared.Registry.MERGE)
+ .name("udf")
+ .persist()
+ .build();
+ registry = storeProvider.getOrCreateStore(registrationConfig);
+ registry.putIfAbsent(REGISTRY, Registry.getDefaultInstance());
+ } catch (StoreException e) {
+ throw new DrillRuntimeException("Failure while loading remote registry.", e);
+ }
+
+ TransientStoreConfig<String> unregistrationConfig = TransientStoreConfig.
+ newJacksonBuilder(mapper, String.class).name("udf/unregister").build();
+ unregistration = coordinator.getOrCreateTransientStore(unregistrationConfig);
+ unregistration.addListener(unregistrationListener);
+
+ TransientStoreConfig<String> jarsConfig = TransientStoreConfig.
+ newJacksonBuilder(mapper, String.class).name("udf/jars").build();
+ jars = coordinator.getOrCreateTransientStore(jarsConfig);
+ }
+
+ /**
+ * Creates if absent and validates three udf areas: STAGING, REGISTRY and TMP.
+ * Generated udf ares root from {@link ExecConstants#UDF_DIRECTORY_ROOT},
+ * if not set, uses user home directory instead.
+ */
+ private void prepareAreas(DrillConfig config) {
+ Configuration conf = new Configuration();
+ if (config.hasPath(ExecConstants.UDF_DIRECTORY_FS)) {
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, config.getString(ExecConstants.UDF_DIRECTORY_FS));
+ }
+
+ try {
+ this.fs = FileSystem.get(conf);
+ } catch (IOException e) {
+ DrillRuntimeException.format(e, "Error during file system %s setup", conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
+ }
+
+ String root = fs.getHomeDirectory().toUri().getPath();
+ if (config.hasPath(ExecConstants.UDF_DIRECTORY_ROOT)) {
+ root = config.getString(ExecConstants.UDF_DIRECTORY_ROOT);
+ }
+
+ this.registryArea = createArea(fs, root, config.getString(ExecConstants.UDF_DIRECTORY_REGISTRY));
+ this.stagingArea = createArea(fs, root, config.getString(ExecConstants.UDF_DIRECTORY_STAGING));
+ this.tmpArea = createArea(fs, root, config.getString(ExecConstants.UDF_DIRECTORY_TMP));
+ }
+
+ /**
+ * Concatenates udf are with root directory.
+ * Creates udf area, if area does not exist.
+ * Checks if area exists and is directory, if it is writable for current user,
+ * throws {@link DrillRuntimeException} otherwise.
+ *
+ * @param fs file system where area should be created or checked
+ * @param root root directory
+ * @param directory directory path
+ * @return path to area
+ */
+ private Path createArea(FileSystem fs, String root, String directory) {
+ Path path = new Path(new File(root, directory).toURI().getPath());
+ String fullPath = path.toUri().getPath();
+ try {
+ fs.mkdirs(path);
+ Preconditions.checkState(fs.exists(path), "Area [%s] must exist", fullPath);
+ FileStatus fileStatus = fs.getFileStatus(path);
+ Preconditions.checkState(fileStatus.isDirectory(), "Area [%s] must be a directory", fullPath);
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ FsPermission permission = fileStatus.getPermission();
+ // It is considered that current user has write rights on directory if:
+ // 1. current user is owner of the directory and has write rights
+ // 2. current user is in group that has write rights
+ // 3. any user has write rights
+ Preconditions.checkState(
+ (currentUser.getUserName().equals(fileStatus.getOwner())
+ && permission.getUserAction().implies(FsAction.WRITE)) ||
+ (Lists.newArrayList(currentUser.getGroupNames()).contains(fileStatus.getGroup())
+ && permission.getGroupAction().implies(FsAction.WRITE)) ||
+ permission.getOtherAction().implies(FsAction.WRITE),
+ "Area [%s] must be writable and executable for application user", fullPath);
+ } catch (Exception e) {
+ DrillRuntimeException.format(e, "Error during udf area creation [%s] on file system [%s]", fullPath, fs.getUri());
+ }
+ return path;
+ }
+
+ @Override
+ public void close() {
+ try {
+ AutoCloseables.close(
+ fs,
+ registry,
+ unregistration,
+ jars);
+ } catch (Exception e) {
+ logger.warn("Failure on close()", e);
+ }
+ }
+
+ public enum Action {
+ REGISTRATION,
+ UNREGISTRATION
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 44e33cb..ceb1224 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.config.LogicalPlanPersistence;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.sql.DrillOperatorTable;
@@ -222,6 +223,10 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem
return queryContextInfo;
}
+ public RemoteFunctionRegistry getRemoteFunctionRegistry() {
+ return drillbitContext.getRemoteFunctionRegistry();
+ }
+
@Override
public ContextInformation getContextInformation() {
return contextInformation;
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
index 5f489b4..6944a7f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
@@ -22,9 +22,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunction;
-import org.apache.calcite.sql.SqlPrefixOperator;
import org.apache.drill.common.expression.FunctionCallFactory;
-import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.expr.fn.DrillFuncHolder;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.calcite.sql.SqlFunctionCategory;
@@ -35,10 +33,10 @@ import org.apache.calcite.sql.SqlSyntax;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.server.options.SystemOptionManager;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Implementation of {@link SqlOperatorTable} that contains standard operators and functions provided through
@@ -54,6 +52,9 @@ public class DrillOperatorTable extends SqlStdOperatorTable {
private final ArrayListMultimap<String, SqlOperator> drillOperatorsWithoutInferenceMap = ArrayListMultimap.create();
private final ArrayListMultimap<String, SqlOperator> drillOperatorsWithInferenceMap = ArrayListMultimap.create();
+ // indicates local function registry version based on which drill operator were loaded
+ // is used to define if we need to reload operator table in case when function signature was not found
+ private long functionRegistryVersion;
private final OptionManager systemOptionManager;
@@ -64,6 +65,23 @@ public class DrillOperatorTable extends SqlStdOperatorTable {
this.systemOptionManager = systemOptionManager;
}
+ /** Cleans up all operator holders and reloads operators */
+ public void reloadOperators(FunctionImplementationRegistry registry) {
+ drillOperatorsWithoutInference.clear();
+ drillOperatorsWithInference.clear();
+ drillOperatorsWithoutInferenceMap.clear();
+ drillOperatorsWithInferenceMap.clear();
+ registry.register(this);
+ }
+
+ public long setFunctionRegistryVersion(long version) {
+ return functionRegistryVersion = version;
+ }
+
+ public long getFunctionRegistryVersion() {
+ return functionRegistryVersion;
+ }
+
/**
* When the option planner.type_inference.enable is turned off, the operators which are added via this method
* will be used.
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index dbe620d..19123d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -24,6 +24,9 @@ import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.FunctionNotFoundException;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.ops.UdfUtilities;
import org.apache.drill.exec.physical.PhysicalPlan;
@@ -91,7 +94,7 @@ public class DrillSqlWorker {
}
try {
- return handler.getPlan(sqlNode);
+ return getPhysicalPlan(handler, sqlNode, context);
} catch(ValidationException e) {
String errorMessage = e.getCause() != null ? e.getCause().getMessage() : e.getMessage();
throw UserException.validationError(e)
@@ -108,5 +111,27 @@ public class DrillSqlWorker {
}
}
+ /**
+ * Returns query physical plan.
+ * In case of {@link FunctionNotFoundException} and dynamic udf support is enabled, attempts to load remote functions.
+ * If at least one function was loaded or local function function registry version has changed,
+ * makes one more attempt to get query physical plan.
+ */
+ private static PhysicalPlan getPhysicalPlan(AbstractSqlHandler handler, SqlNode sqlNode, QueryContext context)
+ throws RelConversionException, IOException, ForemanSetupException, ValidationException {
+ try {
+ return handler.getPlan(sqlNode);
+ } catch (FunctionNotFoundException e) {
+ if (context.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) {
+ DrillOperatorTable drillOperatorTable = context.getDrillOperatorTable();
+ FunctionImplementationRegistry functionRegistry = context.getFunctionRegistry();
+ if (functionRegistry.loadRemoteFunctions(drillOperatorTable.getFunctionRegistryVersion())) {
+ drillOperatorTable.reloadOperators(functionRegistry);
+ return handler.getPlan(sqlNode);
+ }
+ }
+ throw e;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
index 3d0d538..0c3c6a0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
@@ -54,11 +54,15 @@ import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
import org.apache.calcite.sql.validate.AggregatingSelectScope;
import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
+import org.apache.calcite.sql.validate.SqlValidatorException;
import org.apache.calcite.sql.validate.SqlValidatorImpl;
import org.apache.calcite.sql.validate.SqlValidatorScope;
import org.apache.calcite.sql2rel.RelDecorrelator;
import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.exception.FunctionNotFoundException;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.ops.UdfUtilities;
import org.apache.drill.exec.planner.cost.DrillCostBase;
@@ -160,6 +164,11 @@ public class SqlConverter {
SqlNode validatedNode = validator.validate(parsedNode);
return validatedNode;
} catch (RuntimeException e) {
+ final Throwable rootCause = ExceptionUtils.getRootCause(e);
+ if (rootCause instanceof SqlValidatorException
+ && StringUtils.contains(rootCause.getMessage(), "No match found for function signature")) {
+ throw new FunctionNotFoundException(rootCause.getMessage(), e);
+ }
UserException.Builder builder = UserException
.validationError(e)
.addContext("SQL Query", sql);
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java
new file mode 100644
index 0000000..8515c8a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.FunctionValidationException;
+import org.apache.drill.exec.exception.JarValidationException;
+import org.apache.drill.exec.exception.VersionMismatchException;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.planner.sql.DirectPlan;
+import org.apache.drill.exec.planner.sql.parser.SqlCreateFunction;
+import org.apache.drill.exec.proto.UserBitShared.Jar;
+import org.apache.drill.exec.proto.UserBitShared.Registry;
+import org.apache.drill.exec.store.sys.store.DataChangeVersion;
+import org.apache.drill.exec.util.JarUtil;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+public class CreateFunctionHandler extends DefaultSqlHandler {
+
+ private static org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CreateFunctionHandler.class);
+
+ public CreateFunctionHandler(SqlHandlerConfig config) {
+ super(config);
+ }
+
+ /**
+ * Registers UDFs dynamically. Process consists of several steps:
+ * <ol>
+ * <li>Registering jar in jar registry to ensure that several jars with the same name is not registered.</li>
+ * <li>Binary and source jars validation and back up.</li>
+ * <li>Validation against local function registry.</li>
+ * <li>Validation against remote function registry.</li>
+ * <li>Remote function registry update.</li>
+ * <li>Copying of jars to registry area and clean up.</li>
+ * </ol>
+ *
+ * UDFs registration is allowed only if dynamic UDFs support is enabled.
+ *
+ * @return - Single row indicating list of registered UDFs, or error message otherwise.
+ */
+ @Override
+ public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException, IOException {
+ if (!context.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) {
+ throw UserException.validationError()
+ .message("Dynamic UDFs support is disabled.")
+ .build(logger);
+ }
+ RemoteFunctionRegistry remoteRegistry = context.getRemoteFunctionRegistry();
+ JarManager jarManager = new JarManager(sqlNode, remoteRegistry);
+
+ boolean inProgress = false;
+ try {
+ final String action = remoteRegistry.addToJars(jarManager.getBinaryName(), RemoteFunctionRegistry.Action.REGISTRATION);
+ if (!(inProgress = action == null)) {
+ return DirectPlan.createDirectPlan(context, false,
+ String.format("Jar with %s name is used. Action: %s", jarManager.getBinaryName(), action));
+ }
+
+ jarManager.initRemoteBackup();
+ List<String> functions = validateAgainstLocalRegistry(jarManager, context.getFunctionRegistry());
+ initRemoteRegistration(functions, jarManager, remoteRegistry, remoteRegistry.getRetryAttempts());
+ jarManager.deleteQuietlyFromStagingArea();
+
+ return DirectPlan.createDirectPlan(context, true,
+ String.format("The following UDFs in jar %s have been registered:\n%s", jarManager.getBinaryName(), functions));
+
+ } catch (Exception e) {
+ logger.error("Error during UDF registration", e);
+ return DirectPlan.createDirectPlan(context, false, e.getMessage());
+ } finally {
+ if (inProgress) {
+ remoteRegistry.removeFromJars(jarManager.getBinaryName());
+ }
+ jarManager.cleanUp();
+ }
+ }
+
+
+ /**
+ * Instantiates coping of binary to local file system
+ * and validates functions from this jar against local function registry.
+ *
+ * @param jarManager helps coping binary to local file system
+ * @param localFunctionRegistry instance of local function registry to instantiate local validation
+ * @return list of validated function signatures
+ * @throws IOException in case of problems during copying binary to local file system
+ * @throws FunctionValidationException in case duplicated function was found
+ */
+ private List<String> validateAgainstLocalRegistry(JarManager jarManager,
+ FunctionImplementationRegistry localFunctionRegistry) throws IOException {
+ Path localBinary = jarManager.copyBinaryToLocal();
+ return localFunctionRegistry.validate(localBinary);
+ }
+
+ /**
+ * Validates jar and its functions against remote jars.
+ * First checks if there is no duplicate by jar name and then looks for duplicates among functions.
+ *
+ * @param remoteJars list of remote jars to validate against
+ * @param jarName jar name to be validated
+ * @param functions list of functions present in jar to be validated
+ * @throws JarValidationException in case of jar with the same name was found
+ * @throws FunctionValidationException in case duplicated function was found
+ */
+ private void validateAgainstRemoteRegistry(List<Jar> remoteJars, String jarName, List<String> functions) {
+ for (Jar remoteJar : remoteJars) {
+ if (remoteJar.getName().equals(jarName)) {
+ throw new JarValidationException(String.format("Jar with %s name has been already registered", jarName));
+ }
+ for (String remoteFunction : remoteJar.getFunctionSignatureList()) {
+ for (String func : functions) {
+ if (remoteFunction.equals(func)) {
+ throw new FunctionValidationException(
+ String.format("Found duplicated function in %s: %s", remoteJar.getName(), remoteFunction));
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Instantiates remote registration. First gets remote function registry with version.
+ * Version is used to ensure that we update the same registry we validated against.
+ * Then validates against list of remote jars.
+ * If validation is successful, starts updating remote function registry.
+ * If during update {@link VersionMismatchException} was detected,
+ * calls itself recursively to instantiate new remote registration process.
+ * Since remote registry version has changed, we need to re-validate against remote function registry one more time.
+ * Each time recursive call occurs, decreases retry attempts counter by one.
+ * If retry attempts number hits 0, throws exception that failed to update remote function registry.
+ *
+ * @param functions list of functions present in jar
+ * @param jarManager helper class for copying jars to registry area
+ * @param remoteRegistry remote function registry
+ * @param retryAttempts number of retry attempts
+ * @throws IOException in case of problems with copying jars to registry area
+ */
+ private void initRemoteRegistration(List<String> functions,
+ JarManager jarManager,
+ RemoteFunctionRegistry remoteRegistry,
+ int retryAttempts) throws IOException {
+ DataChangeVersion version = new DataChangeVersion();
+ List<Jar> remoteJars = remoteRegistry.getRegistry(version).getJarList();
+ validateAgainstRemoteRegistry(remoteJars, jarManager.getBinaryName(), functions);
+ jarManager.copyToRegistryArea();
+ boolean cleanUp = true;
+ List<Jar> jars = Lists.newArrayList(remoteJars);
+ jars.add(Jar.newBuilder().setName(jarManager.getBinaryName()).addAllFunctionSignature(functions).build());
+ Registry updatedRegistry = Registry.newBuilder().addAllJar(jars).build();
+ try {
+ remoteRegistry.updateRegistry(updatedRegistry, version);
+ cleanUp = false;
+ } catch (VersionMismatchException ex) {
+ if (retryAttempts-- == 0) {
+ throw new DrillRuntimeException("Failed to update remote function registry. Exceeded retry attempts limit.");
+ }
+ initRemoteRegistration(functions, jarManager, remoteRegistry, retryAttempts);
+ } finally {
+ if (cleanUp) {
+ jarManager.deleteQuietlyFromRegistryArea();
+ }
+ }
+ }
+
+ /**
+ * Inner helper class that encapsulates logic for working with jars.
+ * During initialization it creates path to staging jar, local and remote temporary jars, registry jars.
+ * Is responsible for validation, copying and deletion actions.
+ */
+ private class JarManager {
+
+ private final String binaryName;
+ private final FileSystem fs;
+
+ private final Path remoteTmpDir;
+ private final Path localTmpDir;
+
+ private final Path stagingBinary;
+ private final Path stagingSource;
+
+ private final Path tmpRemoteBinary;
+ private final Path tmpRemoteSource;
+
+ private final Path registryBinary;
+ private final Path registrySource;
+
+ JarManager(SqlNode sqlNode, RemoteFunctionRegistry remoteRegistry) throws ForemanSetupException {
+ SqlCreateFunction node = unwrap(sqlNode, SqlCreateFunction.class);
+ this.binaryName = ((SqlCharStringLiteral) node.getJar()).toValue();
+ String sourceName = JarUtil.getSourceName(binaryName);
+
+ this.stagingBinary = new Path(remoteRegistry.getStagingArea(), binaryName);
+ this.stagingSource = new Path(remoteRegistry.getStagingArea(), sourceName);
+
+ this.remoteTmpDir = new Path(remoteRegistry.getTmpArea(), UUID.randomUUID().toString());
+ this.tmpRemoteBinary = new Path(remoteTmpDir, binaryName);
+ this.tmpRemoteSource = new Path(remoteTmpDir, sourceName);
+
+ this.registryBinary = new Path(remoteRegistry.getRegistryArea(), binaryName);
+ this.registrySource = new Path(remoteRegistry.getRegistryArea(), sourceName);
+
+ this.localTmpDir = new Path(Files.createTempDir().toURI());
+ this.fs = remoteRegistry.getFs();
+ }
+
+ /**
+ * @return binary jar name
+ */
+ String getBinaryName() {
+ return binaryName;
+ }
+
+ /**
+ * Validates that both binary and source jar are present in staging area,
+ * it is expected that binary and source have standard naming convention.
+ * Backs up both jars to unique folder in remote temporary area.
+ *
+ * @throws IOException in case of binary or source absence or problems during copying jars
+ */
+ void initRemoteBackup() throws IOException {
+ fs.getFileStatus(stagingBinary);
+ fs.getFileStatus(stagingSource);
+ fs.mkdirs(remoteTmpDir);
+ FileUtil.copy(fs, stagingBinary, fs, tmpRemoteBinary, false, true, fs.getConf());
+ FileUtil.copy(fs, stagingSource, fs, tmpRemoteSource, false, true, fs.getConf());
+ }
+
+ /**
+ * Copies binary jar to unique folder on local file system.
+ * Source jar is not needed for local validation.
+ *
+ * @return path to local binary jar
+ * @throws IOException in case of problems during copying binary jar
+ */
+ Path copyBinaryToLocal() throws IOException {
+ Path localBinary = new Path(localTmpDir, binaryName);
+ fs.copyToLocalFile(tmpRemoteBinary, localBinary);
+ return localBinary;
+ }
+
+ /**
+ * Copies binary and source jars to registry area,
+ * in case of {@link IOException} removes copied jar(-s) from registry area
+ *
+ * @throws IOException is re-thrown in case of problems during copying process
+ */
+ void copyToRegistryArea() throws IOException {
+ FileUtil.copy(fs, tmpRemoteBinary, fs, registryBinary, false, true, fs.getConf());
+ try {
+ FileUtil.copy(fs, tmpRemoteSource, fs, registrySource, false, true, fs.getConf());
+ } catch (IOException e) {
+ deleteQuietly(registryBinary, false);
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Deletes binary and sources jars from staging area, in case of problems, logs warning and proceeds.
+ */
+ void deleteQuietlyFromStagingArea() {
+ deleteQuietly(stagingBinary, false);
+ deleteQuietly(stagingSource, false);
+ }
+
+ /**
+ * Deletes binary and sources jars from registry area, in case of problems, logs warning and proceeds.
+ */
+ void deleteQuietlyFromRegistryArea() {
+ deleteQuietly(registryBinary, false);
+ deleteQuietly(registrySource, false);
+ }
+
+ /**
+ * Removes quietly remote and local unique folders in temporary directories.
+ */
+ void cleanUp() {
+ FileUtils.deleteQuietly(new File(localTmpDir.toUri()));
+ deleteQuietly(remoteTmpDir, true);
+ }
+
+ /**
+ * Deletes quietly file or directory, in case of errors, logs warning and proceeds.
+ *
+ * @param path path to file or directory
+ * @param isDirectory set to true if we need to delete a directory
+ */
+ private void deleteQuietly(Path path, boolean isDirectory) {
+ try {
+ fs.delete(path, isDirectory);
+ } catch (IOException e) {
+ logger.warn(String.format("Error during deletion [%s]", path.toUri().getPath()), e);
+ }
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java
new file mode 100644
index 0000000..5269a4b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.VersionMismatchException;
+import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.planner.sql.DirectPlan;
+import org.apache.drill.exec.planner.sql.parser.SqlDropFunction;
+import org.apache.drill.exec.proto.UserBitShared.Jar;
+import org.apache.drill.exec.proto.UserBitShared.Registry;
+import org.apache.drill.exec.store.sys.store.DataChangeVersion;
+import org.apache.drill.exec.util.JarUtil;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.List;
+
+public class DropFunctionHandler extends DefaultSqlHandler {
+
+ private static org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DropFunctionHandler.class);
+
+ public DropFunctionHandler(SqlHandlerConfig config) {
+ super(config);
+ }
+
+ /**
+ * Unregisters UDFs dynamically. Process consists of several steps:
+ * <ol>
+ * <li>Registering jar in jar registry to ensure that several jars with the same name is not being unregistered.</li>
+ * <li>Starts remote unregistration process, gets list of all jars and excludes jar to be deleted.</li>
+ * <li>Signals drill bits to start local unregistration process.</li>
+ * <li>Removes source and binary jars from registry area.</li>
+ * </ol>
+ *
+ * UDFs unregistration is allowed only if dynamic UDFs support is enabled.
+ * Only jars registered dynamically can be unregistered,
+ * built-in functions loaded at start up are not allowed to be unregistered.
+ *
+ * Limitation: before jar unregistration make sure no one is using functions from this jar.
+ * There is no guarantee that running queries will finish successfully or give correct result.
+ *
+ * @return - Single row indicating list of unregistered UDFs, raise exception otherwise
+ */
+ @Override
+ public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException, IOException {
+ if (!context.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) {
+ throw UserException.validationError()
+ .message("Dynamic UDFs support is disabled.")
+ .build(logger);
+ }
+
+ SqlDropFunction node = unwrap(sqlNode, SqlDropFunction.class);
+ String jarName = ((SqlCharStringLiteral) node.getJar()).toValue();
+ RemoteFunctionRegistry remoteFunctionRegistry = context.getRemoteFunctionRegistry();
+
+ boolean inProgress = false;
+ try {
+ final String action = remoteFunctionRegistry.addToJars(jarName, RemoteFunctionRegistry.Action.UNREGISTRATION);
+ if (!(inProgress = action == null)) {
+ return DirectPlan.createDirectPlan(context, false, String.format("Jar with %s name is used. Action: %s", jarName, action));
+ }
+
+ Jar deletedJar = unregister(jarName, remoteFunctionRegistry, remoteFunctionRegistry.getRetryAttempts());
+ if (deletedJar == null) {
+ return DirectPlan.createDirectPlan(context, false, String.format("Jar %s is not registered in remote registry", jarName));
+ }
+ remoteFunctionRegistry.submitForUnregistration(jarName);
+
+ removeJarFromArea(jarName, remoteFunctionRegistry.getFs(), remoteFunctionRegistry.getRegistryArea());
+ removeJarFromArea(JarUtil.getSourceName(jarName), remoteFunctionRegistry.getFs(), remoteFunctionRegistry.getRegistryArea());
+
+ return DirectPlan.createDirectPlan(context, true,
+ String.format("The following UDFs in jar %s have been unregistered:\n%s", jarName, deletedJar.getFunctionSignatureList()));
+
+ } catch (Exception e) {
+ logger.error("Error during UDF unregistration", e);
+ return DirectPlan.createDirectPlan(context, false, e.getMessage());
+ } finally {
+ if (inProgress) {
+ remoteFunctionRegistry.finishUnregistration(jarName);
+ remoteFunctionRegistry.removeFromJars(jarName);
+ }
+ }
+ }
+
+ /**
+ * First gets remote function registry with version.
+ * Version is used to ensure that we update the same registry we removed jars from.
+ * Looks for a jar to be deleted, if founds one,
+ * attempts to update remote registry with updated list of jars, that excludes jar to be deleted.
+ * If during update {@link VersionMismatchException} was detected,
+ * calls itself recursively to instantiate new remote unregistration process.
+ * Since remote registry version has changed we need to look for jar to be deleted one more time.
+ * Each time recursive call occurs, decreases retry attempts counter by one.
+ * If retry attempts number hits 0, throws exception that failed to update remote function registry.
+ *
+ * @param jarName jar name
+ * @param remoteFunctionRegistry remote function registry
+ * @param retryAttempts number of retry attempts
+ * @return jar that was unregistered, null otherwise
+ */
+ private Jar unregister(String jarName, RemoteFunctionRegistry remoteFunctionRegistry, int retryAttempts) {
+ DataChangeVersion version = new DataChangeVersion();
+ Registry registry = remoteFunctionRegistry.getRegistry(version);
+ Jar jarToBeDeleted = null;
+ List<Jar> jars = Lists.newArrayList();
+ for (Jar j : registry.getJarList()) {
+ if (j.getName().equals(jarName)) {
+ jarToBeDeleted = j;
+ } else {
+ jars.add(j);
+ }
+ }
+ if (jarToBeDeleted != null) {
+ Registry updatedRegistry = Registry.newBuilder().addAllJar(jars).build();
+ try {
+ remoteFunctionRegistry.updateRegistry(updatedRegistry, version);
+ } catch (VersionMismatchException ex) {
+ if (retryAttempts-- == 0) {
+ throw new DrillRuntimeException("Failed to update remote function registry. Exceeded retry attempts limit.");
+ }
+ unregister(jarName, remoteFunctionRegistry, retryAttempts);
+ }
+ }
+ return jarToBeDeleted;
+ }
+
+ /**
+ * Removes jar from indicated area, in case of error log it and proceeds.
+ *
+ * @param jarName jar name
+ * @param fs file system
+ * @param area path to area
+ */
+ private void removeJarFromArea(String jarName, FileSystem fs, Path area) {
+ try {
+ fs.delete(new Path(area, jarName), false);
+ } catch (IOException e) {
+ logger.error("Error removing jar {} from area {}", jarName, area.toUri().getPath());
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
index fa0d319..53e3cd5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
@@ -174,6 +174,8 @@ public class CompoundIdentifierConverter extends SqlShuttle {
rules.put(SqlRefreshMetadata.class, R(D));
rules.put(SqlSetOption.class, R(D, D, D));
rules.put(SqlDescribeSchema.class, R(D));
+ rules.put(SqlCreateFunction.class, R(D));
+ rules.put(SqlDropFunction.class, R(D));
REWRITE_RULES = ImmutableMap.copyOf(rules);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateFunction.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateFunction.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateFunction.java
new file mode 100644
index 0000000..c14f468
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateFunction.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.parser;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.CreateFunctionHandler;
+import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
+
+import java.util.List;
+
+public class SqlCreateFunction extends DrillSqlCall {
+
+ private final SqlNode jar;
+
+ public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE_FUNCTION", SqlKind.OTHER) {
+ @Override
+ public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
+ return new SqlCreateFunction(pos, operands[0]);
+ }
+ };
+
+ public SqlCreateFunction(SqlParserPos pos, SqlNode jar) {
+ super(pos);
+ this.jar = jar;
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ List<SqlNode> opList = Lists.newArrayList();
+ opList.add(jar);
+ return opList;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("CREATE");
+ writer.keyword("FUNCTION");
+ writer.keyword("USING");
+ writer.keyword("JAR");
+ jar.unparse(writer, leftPrec, rightPrec);
+ }
+
+ @Override
+ public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) {
+ return new CreateFunctionHandler(config);
+ }
+
+ public SqlNode getJar() { return jar; }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropFunction.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropFunction.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropFunction.java
new file mode 100644
index 0000000..77d2b76
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropFunction.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.parser;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.DropFunctionHandler;
+import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
+
+import java.util.List;
+
+public class SqlDropFunction extends DrillSqlCall {
+
+ private final SqlNode jar;
+
+ public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("DROP_FUNCTION", SqlKind.OTHER) {
+ @Override
+ public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
+ return new SqlDropFunction(pos, operands[0]);
+ }
+ };
+
+ public SqlDropFunction(SqlParserPos pos, SqlNode jar) {
+ super(pos);
+ this.jar = jar;
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ List<SqlNode> opList = Lists.newArrayList();
+ opList.add(jar);
+ return opList;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("DROP");
+ writer.keyword("FUNCTION");
+ writer.keyword("USING");
+ writer.keyword("JAR");
+ jar.unparse(writer, leftPrec, rightPrec);
+ }
+
+ @Override
+ public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) {
+ return new DropFunctionHandler(config);
+ }
+
+ public SqlNode getJar() { return jar; }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java
index bd7c779..3eed022 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java
@@ -29,7 +29,7 @@ import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.server.options.TypeValidators;
+import org.apache.drill.exec.server.options.TypeValidators.StringValidator;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.security.UserGroupInformation;
@@ -84,10 +84,10 @@ public class InboundImpersonationManager {
/**
* Validator for impersonation policies.
*/
- public static class InboundImpersonationPolicyValidator extends TypeValidators.AdminOptionValidator {
+ public static class InboundImpersonationPolicyValidator extends StringValidator {
public InboundImpersonationPolicyValidator(String name, String def) {
- super(name, def);
+ super(name, def, true);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 55a2b05..3f74268 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -123,6 +123,7 @@ public class Drillbit implements AutoCloseable {
storageRegistry.init();
drillbitContext.getOptionManager().init();
javaPropertiesToSystemOptions();
+ manager.getContext().getRemoteFunctionRegistry().init(context.getConfig(), storeProvider, coord);
registrationHandle = coord.register(md);
webServer.start();
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index 1af6d11..3eb87ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -29,6 +29,7 @@ import org.apache.drill.common.scanner.persistence.ScanResult;
import org.apache.drill.exec.compile.CodeCompiler;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -180,8 +181,12 @@ public class DrillbitContext implements AutoCloseable {
return classpathScan;
}
+ public RemoteFunctionRegistry getRemoteFunctionRegistry() { return functionRegistry.getRemoteFunctionRegistry(); }
+
@Override
public void close() throws Exception {
getOptionManager().close();
+ getFunctionImplementationRegistry().close();
+ getRemoteFunctionRegistry().close();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
index db42603..82f4ab9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
@@ -26,9 +26,16 @@ public abstract class OptionValidator {
// Stored here as well as in the option static class to allow insertion of option optionName into
// the error messages produced by the validator
private final String optionName;
+ private final boolean isAdminOption;
+ /** By default, if admin option value is not specified, it would be set to false.*/
public OptionValidator(String optionName) {
+ this(optionName, false);
+ }
+
+ public OptionValidator(String optionName, boolean isAdminOption) {
this.optionName = optionName;
+ this.isAdminOption = isAdminOption;
}
/**
@@ -69,6 +76,13 @@ public abstract class OptionValidator {
}
/**
+ * @return true is option is system-level property that can be only specified by admin (not user).
+ */
+ public boolean isAdminOption() {
+ return isAdminOption;
+ }
+
+ /**
* Gets the default option value for this validator.
*
* @return default option value
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 115ea47..ee94493 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -147,7 +147,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
ExecConstants.IMPLICIT_FQN_COLUMN_LABEL_VALIDATOR,
ExecConstants.IMPLICIT_FILEPATH_COLUMN_LABEL_VALIDATOR,
ExecConstants.CODE_GEN_EXP_IN_METHOD_SIZE_VALIDATOR,
- ExecConstants.CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS_VALIDATOR
+ ExecConstants.CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS_VALIDATOR,
+ ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED_VALIDATOR
};
final Map<String, OptionValidator> tmp = new HashMap<>();
for (final OptionValidator validator : validators) {
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
index d015040..b4074ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
@@ -141,25 +141,41 @@ public class TypeValidators {
public static class BooleanValidator extends TypeValidator {
public BooleanValidator(String name, boolean def) {
- super(name, Kind.BOOLEAN, OptionValue.createBoolean(OptionType.SYSTEM, name, def));
+ this(name, def, false);
+ }
+
+ public BooleanValidator(String name, boolean def, boolean isAdminOption) {
+ super(name, Kind.BOOLEAN, OptionValue.createBoolean(OptionType.SYSTEM, name, def), isAdminOption);
}
}
public static class StringValidator extends TypeValidator {
public StringValidator(String name, String def) {
- super(name, Kind.STRING, OptionValue.createString(OptionType.SYSTEM, name, def));
+ this(name, def, false);
+ }
+
+ public StringValidator(String name, String def, boolean isAdminOption) {
+ super(name, Kind.STRING, OptionValue.createString(OptionType.SYSTEM, name, def), isAdminOption);
}
}
public static class LongValidator extends TypeValidator {
public LongValidator(String name, long def) {
- super(name, Kind.LONG, OptionValue.createLong(OptionType.SYSTEM, name, def));
+ this(name, def, false);
+ }
+
+ public LongValidator(String name, long def, boolean isAdminOption) {
+ super(name, Kind.LONG, OptionValue.createLong(OptionType.SYSTEM, name, def), isAdminOption);
}
}
public static class DoubleValidator extends TypeValidator {
public DoubleValidator(String name, double def) {
- super(name, Kind.DOUBLE, OptionValue.createDouble(OptionType.SYSTEM, name, def));
+ this(name, def, false);
+ }
+
+ public DoubleValidator(String name, double def, boolean isAdminOption) {
+ super(name, Kind.DOUBLE, OptionValue.createDouble(OptionType.SYSTEM, name, def), isAdminOption);
}
}
@@ -184,22 +200,6 @@ public class TypeValidators {
}
}
- public static class AdminOptionValidator extends StringValidator {
- public AdminOptionValidator(String name, String def) {
- super(name, def);
- }
-
- @Override
- public void validate(final OptionValue v, final OptionManager manager) {
- if (v.type != OptionType.SYSTEM) {
- throw UserException.validationError()
- .message("Admin related settings can only be set at SYSTEM level scope. Given scope '%s'.", v.type)
- .build(logger);
- }
- super.validate(v, manager);
- }
- }
-
/**
* Validator that checks if the given value is included in a list of acceptable values. Case insensitive.
*/
@@ -229,7 +229,11 @@ public class TypeValidators {
private final OptionValue defaultValue;
public TypeValidator(final String name, final Kind kind, final OptionValue defValue) {
- super(name);
+ this(name, kind, defValue, false);
+ }
+
+ public TypeValidator(final String name, final Kind kind, final OptionValue defValue, final boolean isAdminOption) {
+ super(name, isAdminOption);
checkArgument(defValue.type == OptionType.SYSTEM, "Default value must be SYSTEM type.");
this.kind = kind;
this.defaultValue = defValue;
@@ -248,6 +252,11 @@ public class TypeValidators {
kind.name(), v.kind.name()))
.build(logger);
}
+ if (isAdminOption() && v.type != OptionType.SYSTEM) {
+ throw UserException.validationError()
+ .message("Admin related settings can only be set at SYSTEM level scope. Given scope '%s'.", v.type)
+ .build(logger);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
index 248c3cb..ea38278 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
@@ -17,11 +17,11 @@
*/
package org.apache.drill.exec.store.sys;
+import org.apache.drill.exec.store.sys.store.DataChangeVersion;
+
import java.util.Iterator;
import java.util.Map;
-import org.apache.drill.common.collections.ImmutableEntry;
-
public abstract class BasePersistentStore<V> implements PersistentStore<V> {
@Override
@@ -29,4 +29,18 @@ public abstract class BasePersistentStore<V> implements PersistentStore<V> {
return getRange(0, Integer.MAX_VALUE);
}
+ /** By default get with version will behave the same way as without version.
+ * Override this method to add version support. */
+ @Override
+ public V get(String key, DataChangeVersion version) {
+ return get(key);
+ }
+
+ /** By default put with version will behave the same way as without version.
+ * Override this method to add version support. */
+ @Override
+ public void put(String key, V value, DataChangeVersion version) {
+ put(key, value);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
index 767b1d5..bb23752 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.store.sys;
+import org.apache.drill.exec.store.sys.store.DataChangeVersion;
+
import java.util.Iterator;
import java.util.Map;
@@ -38,6 +40,14 @@ public interface PersistentStore<V> extends AutoCloseable {
V get(String key);
/**
+ * Returns the value for the given key if exists, null otherwise.
+ * Sets data change version number.
+ * @param key lookup key
+ * @param version version holder
+ */
+ V get(String key, DataChangeVersion version);
+
+ /**
* Stores the (key, value) tuple in the store. Lifetime of the tuple depends upon store {@link #getMode mode}.
*
* @param key lookup key
@@ -45,6 +55,17 @@ public interface PersistentStore<V> extends AutoCloseable {
*/
void put(String key, V value);
+ /**
+ * Stores the (key, value) tuple in the store.
+ * If tuple already exits, stores it only if versions match,
+ * otherwise throws {@link org.apache.drill.exec.exception.VersionMismatchException}
+ * Lifetime of the tuple depends upon store {@link #getMode mode}.
+ *
+ * @param key lookup key
+ * @param value value to store
+ * @param version version holder
+ */
+ void put(String key, V value, DataChangeVersion version);
/**
* Removes the value corresponding to the given key if exists, nothing happens otherwise.
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java
new file mode 100644
index 0000000..10c1b8f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.store;
+
+public class DataChangeVersion {
+
+ private int version;
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
index 3dde4b8..55f72c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
@@ -63,7 +63,17 @@ public class ZookeeperPersistentStore<V> extends BasePersistentStore<V> {
@Override
public V get(final String key) {
- final byte[] bytes = client.get(key);
+ return get(key, false, null);
+ }
+
+ @Override
+ public V get(final String key, DataChangeVersion version) {
+ return get(key, true, version);
+ }
+
+ public V get(final String key, boolean consistencyFlag, DataChangeVersion version) {
+ byte[] bytes = client.get(key, consistencyFlag, version);
+
if (bytes == null) {
return null;
}
@@ -76,28 +86,30 @@ public class ZookeeperPersistentStore<V> extends BasePersistentStore<V> {
@Override
public void put(final String key, final V value) {
+ put(key, value, null);
+ }
+
+ @Override
+ public void put(final String key, final V value, DataChangeVersion version) {
final InstanceSerializer<V> serializer = config.getSerializer();
try {
final byte[] bytes = serializer.serialize(value);
- client.put(key, bytes);
+ client.put(key, bytes, version);
} catch (final IOException e) {
throw new DrillRuntimeException(String.format("unable to de/serialize value of type %s", value.getClass()), e);
}
}
+
@Override
public boolean putIfAbsent(final String key, final V value) {
- final V old = get(key);
- if (old == null) {
- try {
- final byte[] bytes = config.getSerializer().serialize(value);
- client.put(key, bytes);
- return true;
- } catch (final IOException e) {
- throw new DrillRuntimeException(String.format("unable to serialize value of type %s", value.getClass()), e);
- }
+ try {
+ final byte[] bytes = config.getSerializer().serialize(value);
+ final byte[] data = client.putIfAbsent(key, bytes);
+ return data == null;
+ } catch (final IOException e) {
+ throw new DrillRuntimeException(String.format("unable to serialize value of type %s", value.getClass()), e);
}
- return false;
}
@Override