You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2016/10/18 23:45:29 UTC
[09/11] drill git commit: DRILL-4726: Dynamic UDF Support
DRILL-4726: Dynamic UDF Support
1) Configuration / parsing / options / protos
2) Zookeeper integration
3) Registration / unregistration / lazy-init
4) Unit tests
This closes #574
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/89f2633f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/89f2633f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/89f2633f
Branch: refs/heads/master
Commit: 89f2633f612a645666de8f51dcb19c6f8044a95e
Parents: 8461d10
Author: Arina Ielchiieva <ar...@gmail.com>
Authored: Mon Aug 22 13:29:30 2016 +0000
Committer: Parth Chandra <pa...@apache.org>
Committed: Tue Oct 18 10:47:52 2016 -0700
----------------------------------------------------------------------
.../apache/drill/common/config/DrillConfig.java | 9 +
.../drill/common/scanner/RunTimeScan.java | 21 +
distribution/src/resources/drill-config.sh | 18 +
distribution/src/resources/sqlline.bat | 5 +
exec/java-exec/src/main/codegen/data/Parser.tdd | 7 +-
.../src/main/codegen/includes/parserImpls.ftl | 40 +
.../org/apache/drill/exec/ExecConstants.java | 27 +-
.../drill/exec/coord/zk/ZkEphemeralStore.java | 17 +-
.../drill/exec/coord/zk/ZookeeperClient.java | 101 +-
.../exception/FunctionNotFoundException.java | 27 +
.../exception/FunctionValidationException.java | 28 +
.../exec/exception/JarValidationException.java | 28 +
.../exception/VersionMismatchException.java | 33 +
.../drill/exec/expr/fn/DrillFuncHolder.java | 28 +
.../exec/expr/fn/DrillFunctionRegistry.java | 221 ---
.../exec/expr/fn/DrillSimpleFuncHolder.java | 6 +-
.../drill/exec/expr/fn/FunctionConverter.java | 4 +-
.../expr/fn/FunctionImplementationRegistry.java | 342 ++++-
.../drill/exec/expr/fn/FunctionInitializer.java | 21 +-
.../exec/expr/fn/registry/FunctionHolder.java | 54 +
.../fn/registry/FunctionRegistryHolder.java | 377 +++++
.../drill/exec/expr/fn/registry/JarScan.java | 53 +
.../expr/fn/registry/LocalFunctionRegistry.java | 329 ++++
.../fn/registry/RemoteFunctionRegistry.java | 269 ++++
.../org/apache/drill/exec/ops/QueryContext.java | 5 +
.../exec/planner/sql/DrillOperatorTable.java | 24 +-
.../drill/exec/planner/sql/DrillSqlWorker.java | 27 +-
.../drill/exec/planner/sql/SqlConverter.java | 9 +
.../sql/handlers/CreateFunctionHandler.java | 328 ++++
.../sql/handlers/DropFunctionHandler.java | 167 ++
.../sql/parser/CompoundIdentifierConverter.java | 2 +
.../planner/sql/parser/SqlCreateFunction.java | 79 +
.../planner/sql/parser/SqlDropFunction.java | 79 +
.../rpc/user/InboundImpersonationManager.java | 6 +-
.../org/apache/drill/exec/server/Drillbit.java | 1 +
.../drill/exec/server/DrillbitContext.java | 5 +
.../exec/server/options/OptionValidator.java | 14 +
.../server/options/SystemOptionManager.java | 3 +-
.../exec/server/options/TypeValidators.java | 51 +-
.../exec/store/sys/BasePersistentStore.java | 18 +-
.../drill/exec/store/sys/PersistentStore.java | 21 +
.../exec/store/sys/store/DataChangeVersion.java | 32 +
.../sys/store/ZookeeperPersistentStore.java | 36 +-
.../exec/testing/store/NoWriteLocalStore.java | 61 +-
.../org/apache/drill/exec/util/JarUtil.java | 33 +
.../src/main/resources/drill-module.conf | 29 +
.../java/org/apache/drill/BaseTestQuery.java | 14 -
.../org/apache/drill/TestDynamicUDFSupport.java | 801 ++++++++++
.../java/org/apache/drill/exec/ExecTest.java | 22 +-
.../exec/coord/zk/TestZookeeperClient.java | 49 +-
.../fn/registry/FunctionRegistryHolderTest.java | 279 ++++
.../exec/physical/impl/TestSimpleFunctions.java | 20 +-
.../resources/jars/DrillUDF-1.0-sources.jar | Bin 0 -> 1892 bytes
.../src/test/resources/jars/DrillUDF-1.0.jar | Bin 0 -> 3146 bytes
.../resources/jars/DrillUDF-2.0-sources.jar | Bin 0 -> 1891 bytes
.../src/test/resources/jars/DrillUDF-2.0.jar | Bin 0 -> 3142 bytes
.../jars/DrillUDF_Copy-1.0-sources.jar | Bin 0 -> 1892 bytes
.../test/resources/jars/DrillUDF_Copy-1.0.jar | Bin 0 -> 3185 bytes
.../jars/DrillUDF_DupFunc-1.0-sources.jar | Bin 0 -> 1888 bytes
.../resources/jars/DrillUDF_DupFunc-1.0.jar | Bin 0 -> 3201 bytes
.../jars/DrillUDF_Empty-1.0-sources.jar | Bin 0 -> 536 bytes
.../test/resources/jars/DrillUDF_Empty-1.0.jar | Bin 0 -> 1863 bytes
.../jars/DrillUDF_NoMarkerFile-1.0-sources.jar | Bin 0 -> 1715 bytes
.../jars/DrillUDF_NoMarkerFile-1.0.jar | Bin 0 -> 3084 bytes
.../resources/jars/v2/DrillUDF-1.0-sources.jar | Bin 0 -> 1899 bytes
.../src/test/resources/jars/v2/DrillUDF-1.0.jar | Bin 0 -> 3215 bytes
.../org/apache/drill/jdbc/ITTestShadedJar.java | 19 +
.../drill/exec/proto/SchemaUserBitShared.java | 231 +++
.../apache/drill/exec/proto/UserBitShared.java | 1439 +++++++++++++++++-
.../org/apache/drill/exec/proto/beans/Jar.java | 195 +++
.../apache/drill/exec/proto/beans/Registry.java | 175 +++
protocol/src/main/protobuf/UserBitShared.proto | 20 +
72 files changed, 5986 insertions(+), 373 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
index 43d05c3..6828718 100644
--- a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
+++ b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
@@ -164,6 +164,15 @@ public class DrillConfig extends NestedConfig {
}
/**
+ * Creates a drill configuration using the provided config file.
+ * @param config custom configuration file
+ * @return {@link DrillConfig} instance
+ */
+ public static DrillConfig create(Config config) {
+ return new DrillConfig(config.resolve(), true);
+ }
+
+ /**
* @param overrideFileResourcePathname
* see {@link #create(String)}'s {@code overrideFileResourcePathname}
* @param overriderProps
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java b/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java
index 1d95b04..7faa0fb 100644
--- a/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java
+++ b/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java
@@ -20,7 +20,9 @@ package org.apache.drill.common.scanner;
import java.net.URL;
import java.util.Collection;
import java.util.List;
+import java.util.Set;
+import com.google.common.collect.Lists;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.scanner.persistence.ScanResult;
@@ -75,4 +77,23 @@ public class RunTimeScan {
}
}
+ /**
+ * Scans packages retrieved from config.
+ * Returns scan result with list of packages, classes and annotations found.
+ * Is used to scan specific jars not associated with classpath at runtime.
+ *
+ * @param config to retrieve the packages to scan
+ * @param markedPath list of paths where to scan
+ * @return the scan result with list of packages, classes and annotations found
+ */
+ public static ScanResult dynamicPackageScan(DrillConfig config, Set<URL> markedPath) {
+ List<String> packagePrefixes = ClassPathScanner.getPackagePrefixes(config);
+ return ClassPathScanner.scan(
+ markedPath,
+ packagePrefixes,
+ Lists.<String>newArrayList(),
+ PRESCANNED.getScannedAnnotations(),
+ ClassPathScanner.emptyResult());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/distribution/src/resources/drill-config.sh
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-config.sh b/distribution/src/resources/drill-config.sh
index 737be36..7a72e27 100644
--- a/distribution/src/resources/drill-config.sh
+++ b/distribution/src/resources/drill-config.sh
@@ -324,6 +324,22 @@ if [ -n "$DRILL_CLASSPATH" ]; then
CP="$CP:$DRILL_CLASSPATH"
fi
+# Drill temporary directory is used as base for temporary storage of Dynamic UDF jars.
+# If tmp dir is given, it must exist.
+if [ -n "$DRILL_TMP_DIR" ]; then
+ if [[ ! -d "$DRILL_TMP_DIR" ]]; then
+ fatal_error "Temporary dir does not exist:" $DRILL_TMP_DIR
+ fi
+else
+ # Otherwise, use the default
+ DRILL_TMP_DIR="/tmp"
+fi
+
+mkdir -p "$DRILL_TMP_DIR"
+if [[ ! -d "$DRILL_TMP_DIR" || ! -w "$DRILL_TMP_DIR" ]]; then
+ fatal_error "Temporary directory does not exist or is not writable: $DRILL_TMP_DIR"
+fi
+
# Test for cygwin
is_cygwin=false
case "`uname`" in
@@ -371,6 +387,7 @@ if $is_cygwin; then
DRILL_HOME=`cygpath -w "$DRILL_HOME"`
DRILL_CONF_DIR=`cygpath -w "$DRILL_CONF_DIR"`
DRILL_LOG_DIR=`cygpath -w "$DRILL_LOG_DIR"`
+ DRILL_TMP_DIR=`cygpath -w "$DRILL_TMP_DIR"`
CP=`cygpath -w -p "$CP"`
if [ -z "$HADOOP_HOME" ]; then
export HADOOP_HOME=${DRILL_HOME}/winutils
@@ -391,6 +408,7 @@ export is_cygwin
export DRILL_HOME
export DRILL_CONF_DIR
export DRILL_LOG_DIR
+export DRILL_TMP_DIR
export CP
# DRILL-4870: Don't export JAVA_HOME. Java can find it just fine from the java
# command. If we attempt to work it out, we do so incorrectly for the Mac.
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/distribution/src/resources/sqlline.bat
----------------------------------------------------------------------
diff --git a/distribution/src/resources/sqlline.bat b/distribution/src/resources/sqlline.bat
index a0efdf1..f008604 100755
--- a/distribution/src/resources/sqlline.bat
+++ b/distribution/src/resources/sqlline.bat
@@ -114,6 +114,11 @@ if "test%DRILL_LOG_DIR%" == "test" (
set DRILL_LOG_DIR=%DRILL_HOME%\log
)
+@rem Drill temporary directory is used as base for temporary storage of Dynamic UDF jars.
+if "test%DRILL_TMP_DIR%" == "test" (
+ set DRILL_TMP_DIR=%TEMP%
+)
+
rem ----
rem Deal with Hadoop JARs, if HADOOP_HOME was specified
rem ----
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/codegen/data/Parser.tdd
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/data/Parser.tdd b/exec/java-exec/src/main/codegen/data/Parser.tdd
index ce3ee4c..6c23808 100644
--- a/exec/java-exec/src/main/codegen/data/Parser.tdd
+++ b/exec/java-exec/src/main/codegen/data/Parser.tdd
@@ -38,7 +38,8 @@
"REFRESH",
"METADATA",
"DATABASE",
- "IF"
+ "IF",
+ "JAR"
]
# List of methods for parsing custom SQL statements.
@@ -53,7 +54,9 @@
"SqlShowFiles()",
"SqlCreateTable()",
"SqlDropTable()",
- "SqlRefreshMetadata()"
+ "SqlRefreshMetadata()",
+ "SqlCreateFunction()",
+ "SqlDropFunction()"
]
# List of methods for parsing custom literals.
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
index 9901098..0017446 100644
--- a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
+++ b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
@@ -297,4 +297,44 @@ SqlNode SqlDescribeSchema() :
{
return new SqlDescribeSchema(pos, schema);
}
+}
+
+/**
+* Parse create UDF statement
+* CREATE FUNCTION USING JAR 'jar_name'
+*/
+SqlNode SqlCreateFunction() :
+{
+ SqlParserPos pos;
+ SqlNode jar;
+}
+{
+ <CREATE> { pos = getPos(); }
+ <FUNCTION>
+ <USING>
+ <JAR>
+ jar = StringLiteral()
+ {
+ return new SqlCreateFunction(pos, jar);
+ }
+}
+
+/**
+* Parse drop UDF statement
+* DROP FUNCTION USING JAR 'jar_name'
+*/
+SqlNode SqlDropFunction() :
+{
+ SqlParserPos pos;
+ SqlNode jar;
+}
+{
+ <DROP> { pos = getPos(); }
+ <FUNCTION>
+ <USING>
+ <JAR>
+ jar = StringLiteral()
+ {
+ return new SqlDropFunction(pos, jar);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index d6a210a..0f2321b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec;
import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.rpc.user.InboundImpersonationManager;
import org.apache.drill.exec.server.options.OptionValidator;
-import org.apache.drill.exec.server.options.TypeValidators.AdminOptionValidator;
import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator;
import org.apache.drill.exec.server.options.TypeValidators.DoubleValidator;
import org.apache.drill.exec.server.options.TypeValidators.EnumeratedStringValidator;
@@ -106,10 +105,23 @@ public interface ExecConstants {
String RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS =
"drill.exec.debug.return_error_for_failure_in_cancelled_fragments";
+ String CLIENT_SUPPORT_COMPLEX_TYPES = "drill.client.supports-complex-types";
+ /**
+ * Configuration properties connected with dynamic UDFs support
+ */
+ String UDF_RETRY_ATTEMPTS = "drill.exec.udf.retry-attempts";
+ String UDF_DIRECTORY_FS = "drill.exec.udf.directory.fs";
+ String UDF_DIRECTORY_ROOT = "drill.exec.udf.directory.root";
+ String UDF_DIRECTORY_LOCAL = "drill.exec.udf.directory.local";
+ String UDF_DIRECTORY_STAGING = "drill.exec.udf.directory.staging";
+ String UDF_DIRECTORY_REGISTRY = "drill.exec.udf.directory.registry";
+ String UDF_DIRECTORY_TMP = "drill.exec.udf.directory.tmp";
-
- String CLIENT_SUPPORT_COMPLEX_TYPES = "drill.client.supports-complex-types";
+ /**
+ * Local temporary directory is used as base for temporary storage of Dynamic UDF jars.
+ */
+ String DRILL_TMP_DIR = "drill.tmp-dir";
String OUTPUT_FORMAT_OPTION = "store.format";
OptionValidator OUTPUT_FORMAT_VALIDATOR = new StringValidator(OUTPUT_FORMAT_OPTION, "parquet");
@@ -296,15 +308,13 @@ public interface ExecConstants {
* such as changing system options.
*/
String ADMIN_USERS_KEY = "security.admin.users";
- StringValidator ADMIN_USERS_VALIDATOR =
- new AdminOptionValidator(ADMIN_USERS_KEY, ImpersonationUtil.getProcessUserName());
+ StringValidator ADMIN_USERS_VALIDATOR = new StringValidator(ADMIN_USERS_KEY, ImpersonationUtil.getProcessUserName(), true);
/**
* Option whose value is a comma separated list of admin usergroups.
*/
String ADMIN_USER_GROUPS_KEY = "security.admin.user_groups";
- StringValidator ADMIN_USER_GROUPS_VALIDATOR = new AdminOptionValidator(ADMIN_USER_GROUPS_KEY, "");
-
+ StringValidator ADMIN_USER_GROUPS_VALIDATOR = new StringValidator(ADMIN_USER_GROUPS_KEY, "", true);
/**
* Option whose value is a string representing list of inbound impersonation policies.
*
@@ -337,4 +347,7 @@ public interface ExecConstants {
String CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS = "prepare.statement.create_timeout_ms";
OptionValidator CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS_VALIDATOR =
new PositiveLongValidator(CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS, Integer.MAX_VALUE, 10000);
+
+ String DYNAMIC_UDF_SUPPORT_ENABLED = "exec.udf.enable_dynamic_support";
+ BooleanValidator DYNAMIC_UDF_SUPPORT_ENABLED_VALIDATOR = new BooleanValidator(DYNAMIC_UDF_SUPPORT_ENABLED, true, true);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java
index 94e03ad..f485e9e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java
@@ -88,16 +88,17 @@ public class ZkEphemeralStore<V> extends BaseTransientStore<V> {
@Override
public V putIfAbsent(final String key, final V value) {
- final V old = get(key);
- if (old == null) {
- try {
- final byte[] bytes = config.getSerializer().serialize(value);
- getClient().put(key, bytes);
- } catch (final IOException e) {
- throw new DrillRuntimeException(String.format("unable to serialize value of type %s", value.getClass()), e);
+ try {
+ final InstanceSerializer<V> serializer = config.getSerializer();
+ final byte[] bytes = serializer.serialize(value);
+ final byte[] data = getClient().putIfAbsent(key, bytes);
+ if (data == null) {
+ return null;
}
+ return serializer.deserialize(data);
+ } catch (final IOException e) {
+ throw new DrillRuntimeException(String.format("unable to serialize value of type %s", value.getClass()), e);
}
- return old;
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
index 2debf43..610a2b9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
@@ -31,8 +31,12 @@ import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.drill.common.collections.ImmutableEntry;
import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.exception.VersionMismatchException;
+import org.apache.drill.exec.store.sys.store.DataChangeVersion;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.data.Stat;
/**
* A namespace aware Zookeeper client.
@@ -133,13 +137,52 @@ public class ZookeeperClient implements AutoCloseable {
* the check is eventually consistent.
*
* @param path target path
+ * @param consistent consistency flag
*/
public byte[] get(final String path, final boolean consistent) {
+ return get(path, consistent, null);
+ }
+
+ /**
+ * Returns the value corresponding to the given key, null otherwise.
+ *
+ * The check is consistent as it is made against Zookeeper directly.
+ *
+ * Passes version holder to get data change version.
+ *
+ * @param path target path
+ * @param version version holder
+ */
+ public byte[] get(final String path, DataChangeVersion version) {
+ return get(path, true, version);
+ }
+
+ /**
+ * Returns the value corresponding to the given key, null otherwise.
+ *
+ * If the flag consistent is set, the check is consistent as it is made against Zookeeper directly. Otherwise,
+ * the check is eventually consistent.
+ *
+ * If consistency flag is set to true and version holder is not null, passes version holder to get data change version.
+ * Data change version is retrieved from {@link Stat} object, it increases each time znode data change is performed.
+ * Link to Zookeeper documentation - https://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#sc_zkDataModel_znodes
+ *
+ * @param path target path
+ * @param consistent consistency check
+ * @param version version holder
+ */
+ public byte[] get(final String path, final boolean consistent, final DataChangeVersion version) {
Preconditions.checkNotNull(path, "path is required");
final String target = PathUtils.join(root, path);
if (consistent) {
try {
+ if (version != null) {
+ Stat stat = new Stat();
+ final byte[] bytes = curator.getData().storingStatIn(stat).forPath(target);
+ version.setVersion(stat.getVersion());
+ return bytes;
+ }
return curator.getData().forPath(target);
} catch (final Exception ex) {
throw new DrillRuntimeException(String.format("error retrieving value for [%s]", path), ex);
@@ -179,6 +222,26 @@ public class ZookeeperClient implements AutoCloseable {
* @param data data to store
*/
public void put(final String path, final byte[] data) {
+ put(path, data, null);
+ }
+
+ /**
+ * Puts the given byte sequence into the given path.
+ *
+ * If path does not exists, this call creates it.
+ *
+ * If version holder is not null and path already exists, passes given version for comparison.
+ * Zookeeper maintains stat structure that holds version number which increases each time znode data change is performed.
+ * If we pass version that doesn't match the actual version of the data,
+ * the update will fail {@link org.apache.zookeeper.KeeperException.BadVersionException}.
+ * We catch such exception and re-throw it as {@link VersionMismatchException}.
+ * Link to documentation - https://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#sc_zkDataModel_znodes
+ *
+ * @param path target path
+ * @param data data to store
+ * @param version version holder
+ */
+ public void put(final String path, final byte[] data, DataChangeVersion version) {
Preconditions.checkNotNull(path, "path is required");
Preconditions.checkNotNull(data, "data is required");
@@ -199,9 +262,45 @@ public class ZookeeperClient implements AutoCloseable {
}
}
if (hasNode) {
- curator.setData().forPath(target, data);
+ if (version != null) {
+ try {
+ curator.setData().withVersion(version.getVersion()).forPath(target, data);
+ } catch (final KeeperException.BadVersionException e) {
+ throw new VersionMismatchException("Unable to put data. Version mismatch is detected.", version.getVersion(), e);
+ }
+ } else {
+ curator.setData().forPath(target, data);
+ }
}
getCache().rebuildNode(target);
+ } catch (final VersionMismatchException e) {
+ throw e;
+ } catch (final Exception e) {
+ throw new DrillRuntimeException("unable to put ", e);
+ }
+ }
+
+ /**
+ * Puts the given byte sequence into the given path if path is does not exist.
+ *
+ * @param path target path
+ * @param data data to store
+ * @return null if path was created, else data stored for the given path
+ */
+ public byte[] putIfAbsent(final String path, final byte[] data) {
+ Preconditions.checkNotNull(path, "path is required");
+ Preconditions.checkNotNull(data, "data is required");
+
+ final String target = PathUtils.join(root, path);
+ try {
+ try {
+ curator.create().withMode(mode).forPath(target, data);
+ getCache().rebuildNode(target);
+ return null;
+ } catch (NodeExistsException e) {
+ // do nothing
+ }
+ return curator.getData().forPath(target);
} catch (final Exception e) {
throw new DrillRuntimeException("unable to put ", e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java
new file mode 100644
index 0000000..0d59cc8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.exception;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+
+public class FunctionNotFoundException extends DrillRuntimeException {
+
+ public FunctionNotFoundException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionValidationException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionValidationException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionValidationException.java
new file mode 100644
index 0000000..7475e24
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionValidationException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.exception;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+
+public class FunctionValidationException extends DrillRuntimeException {
+
+ public FunctionValidationException(String message) {
+ super(message);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/exception/JarValidationException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/JarValidationException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/JarValidationException.java
new file mode 100644
index 0000000..a6fa407
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/JarValidationException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.exception;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+
+public class JarValidationException extends DrillRuntimeException {
+
+ public JarValidationException(String message) {
+ super(message);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/exception/VersionMismatchException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/VersionMismatchException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/VersionMismatchException.java
new file mode 100644
index 0000000..796f410
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/VersionMismatchException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.exception;
+
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+
+public class VersionMismatchException extends DrillRuntimeException {
+
+ public VersionMismatchException(String message, int expectedVersion, Throwable cause) {
+ super(message + ". Expected version : " + expectedVersion, cause);
+ }
+
+ public VersionMismatchException(String message, int expectedVersion) {
+ super(message + ". Expected version : " + expectedVersion);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
index 869a4ac..fc51d03 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
@@ -132,6 +132,34 @@ public abstract class DrillFuncHolder extends AbstractFuncHolder {
return attributes.isDeterministic();
}
+ /**
+ * Generates string representation of function input parameters:
+ * PARAMETER_TYPE_1-PARAMETER_MODE_1,PARAMETER_TYPE_2-PARAMETER_MODE_2
+ * Example: VARCHAR-REQUIRED,VARCHAR-OPTIONAL
+ * Returns empty string if function has no input parameters.
+ *
+ * @return string representation of function input parameters
+ */
+ public String getInputParameters() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("");
+ for (ValueReference ref : parameters) {
+ final MajorType type = ref.getType();
+ builder.append(",");
+ builder.append(type.getMinorType().toString());
+ builder.append("-");
+ builder.append(type.getMode().toString());
+ }
+ return builder.length() == 0 ? builder.toString() : builder.substring(1);
+ }
+
+ /**
+ * @return instance of class loader used to load function
+ */
+ public ClassLoader getClassLoader() {
+ return initializer.getClassLoader();
+ }
+
protected JVar[] declareWorkspaceVariables(ClassGenerator<?> g) {
JVar[] workspaceJVars = new JVar[workspaceVars.length];
for (int i = 0; i < workspaceVars.length; i++) {
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFunctionRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFunctionRegistry.java
deleted file mode 100644
index f58d5a5..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFunctionRegistry.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.expr.fn;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.drill.common.scanner.persistence.AnnotatedClassDescriptor;
-import org.apache.drill.common.scanner.persistence.ScanResult;
-import org.apache.drill.exec.planner.logical.DrillConstExecutor;
-import org.apache.drill.exec.planner.sql.DrillOperatorTable;
-import org.apache.drill.exec.planner.sql.DrillSqlAggOperator;
-import org.apache.drill.exec.planner.sql.DrillSqlAggOperatorWithoutInference;
-import org.apache.drill.exec.planner.sql.DrillSqlOperator;
-
-import com.google.common.collect.ArrayListMultimap;
-import org.apache.drill.exec.planner.sql.DrillSqlOperatorWithoutInference;
-
-/**
- * Registry of Drill functions.
- */
-public class DrillFunctionRegistry {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFunctionRegistry.class);
-
- // key: function name (lowercase) value: list of functions with that name
- private final ArrayListMultimap<String, DrillFuncHolder> registeredFunctions = ArrayListMultimap.create();
-
- private static final ImmutableMap<String, Pair<Integer, Integer>> registeredFuncNameToArgRange = ImmutableMap.<String, Pair<Integer, Integer>> builder()
- // CONCAT is allowed to take [1, infinity) number of arguments.
- // Currently, this flexibility is offered by DrillOptiq to rewrite it as
- // a nested structure
- .put("CONCAT", Pair.of(1, Integer.MAX_VALUE))
-
- // When LENGTH is given two arguments, this function relies on DrillOptiq to rewrite it as
- // another function based on the second argument (encodingType)
- .put("LENGTH", Pair.of(1, 2))
-
- // Dummy functions
- .put("CONVERT_TO", Pair.of(2, 2))
- .put("CONVERT_FROM", Pair.of(2, 2))
- .put("FLATTEN", Pair.of(1, 1)).build();
-
- public DrillFunctionRegistry(ScanResult classpathScan) {
- FunctionConverter converter = new FunctionConverter();
- List<AnnotatedClassDescriptor> providerClasses = classpathScan.getAnnotatedClasses();
-
- // Hash map to prevent registering functions with exactly matching signatures
- // key: Function Name + Input's Major Type
- // value: Class name where function is implemented
- //
- final Map<String, String> functionSignatureMap = new HashMap<>();
- for (AnnotatedClassDescriptor func : providerClasses) {
- DrillFuncHolder holder = converter.getHolder(func);
- if (holder != null) {
- // register handle for each name the function can be referred to
- String[] names = holder.getRegisteredNames();
-
- // Create the string for input types
- String functionInput = "";
- for (DrillFuncHolder.ValueReference ref : holder.parameters) {
- functionInput += ref.getType().toString();
- }
- for (String name : names) {
- String functionName = name.toLowerCase();
- registeredFunctions.put(functionName, holder);
- String functionSignature = functionName + functionInput;
- String existingImplementation;
- if ((existingImplementation = functionSignatureMap.get(functionSignature)) != null) {
- throw new AssertionError(
- String.format(
- "Conflicting functions with similar signature found. Func Name: %s, Class name: %s " +
- " Class name: %s", functionName, func.getClassName(), existingImplementation));
- } else if (holder.isAggregating() && !holder.isDeterministic() ) {
- logger.warn("Aggregate functions must be deterministic, did not register function {}", func.getClassName());
- } else {
- functionSignatureMap.put(functionSignature, func.getClassName());
- }
- }
- } else {
- logger.warn("Unable to initialize function for class {}", func.getClassName());
- }
- }
- if (logger.isTraceEnabled()) {
- StringBuilder allFunctions = new StringBuilder();
- for (DrillFuncHolder method: registeredFunctions.values()) {
- allFunctions.append(method.toString()).append("\n");
- }
- logger.trace("Registered functions: [\n{}]", allFunctions);
- }
- }
-
- public int size(){
- return registeredFunctions.size();
- }
-
- /** Returns functions with given name. Function name is case insensitive. */
- public List<DrillFuncHolder> getMethods(String name) {
- return this.registeredFunctions.get(name.toLowerCase());
- }
-
- public void register(DrillOperatorTable operatorTable) {
- registerOperatorsWithInference(operatorTable);
- registerOperatorsWithoutInference(operatorTable);
- }
-
- private void registerOperatorsWithInference(DrillOperatorTable operatorTable) {
- final Map<String, DrillSqlOperator.DrillSqlOperatorBuilder> map = Maps.newHashMap();
- final Map<String, DrillSqlAggOperator.DrillSqlAggOperatorBuilder> mapAgg = Maps.newHashMap();
- for (Entry<String, Collection<DrillFuncHolder>> function : registeredFunctions.asMap().entrySet()) {
- final ArrayListMultimap<Pair<Integer, Integer>, DrillFuncHolder> functions = ArrayListMultimap.create();
- final ArrayListMultimap<Integer, DrillFuncHolder> aggregateFunctions = ArrayListMultimap.create();
- final String name = function.getKey().toUpperCase();
- boolean isDeterministic = true;
- for (DrillFuncHolder func : function.getValue()) {
- final int paramCount = func.getParamCount();
- if(func.isAggregating()) {
- aggregateFunctions.put(paramCount, func);
- } else {
- final Pair<Integer, Integer> argNumberRange;
- if(registeredFuncNameToArgRange.containsKey(name)) {
- argNumberRange = registeredFuncNameToArgRange.get(name);
- } else {
- argNumberRange = Pair.of(func.getParamCount(), func.getParamCount());
- }
- functions.put(argNumberRange, func);
- }
-
- if(!func.isDeterministic()) {
- isDeterministic = false;
- }
- }
- for (Entry<Pair<Integer, Integer>, Collection<DrillFuncHolder>> entry : functions.asMap().entrySet()) {
- final Pair<Integer, Integer> range = entry.getKey();
- final int max = range.getRight();
- final int min = range.getLeft();
- if(!map.containsKey(name)) {
- map.put(name, new DrillSqlOperator.DrillSqlOperatorBuilder()
- .setName(name));
- }
-
- final DrillSqlOperator.DrillSqlOperatorBuilder drillSqlOperatorBuilder = map.get(name);
- drillSqlOperatorBuilder
- .addFunctions(entry.getValue())
- .setArgumentCount(min, max)
- .setDeterministic(isDeterministic);
- }
- for (Entry<Integer, Collection<DrillFuncHolder>> entry : aggregateFunctions.asMap().entrySet()) {
- if(!mapAgg.containsKey(name)) {
- mapAgg.put(name, new DrillSqlAggOperator.DrillSqlAggOperatorBuilder().setName(name));
- }
-
- final DrillSqlAggOperator.DrillSqlAggOperatorBuilder drillSqlAggOperatorBuilder = mapAgg.get(name);
- drillSqlAggOperatorBuilder
- .addFunctions(entry.getValue())
- .setArgumentCount(entry.getKey(), entry.getKey());
- }
- }
-
- for(final Entry<String, DrillSqlOperator.DrillSqlOperatorBuilder> entry : map.entrySet()) {
- operatorTable.addOperatorWithInference(
- entry.getKey(),
- entry.getValue().build());
- }
-
- for(final Entry<String, DrillSqlAggOperator.DrillSqlAggOperatorBuilder> entry : mapAgg.entrySet()) {
- operatorTable.addOperatorWithInference(
- entry.getKey(),
- entry.getValue().build());
- }
- }
-
- private void registerOperatorsWithoutInference(DrillOperatorTable operatorTable) {
- SqlOperator op;
- for (Entry<String, Collection<DrillFuncHolder>> function : registeredFunctions.asMap().entrySet()) {
- Set<Integer> argCounts = Sets.newHashSet();
- String name = function.getKey().toUpperCase();
- for (DrillFuncHolder func : function.getValue()) {
- if (argCounts.add(func.getParamCount())) {
- if (func.isAggregating()) {
- op = new DrillSqlAggOperatorWithoutInference(name, func.getParamCount());
- } else {
- boolean isDeterministic;
- // prevent Drill from folding constant functions with types that cannot be materialized
- // into literals
- if (DrillConstExecutor.NON_REDUCIBLE_TYPES.contains(func.getReturnType().getMinorType())) {
- isDeterministic = false;
- } else {
- isDeterministic = func.isDeterministic();
- }
- op = new DrillSqlOperatorWithoutInference(name, func.getParamCount(), func.getReturnType(), isDeterministic);
- }
- operatorTable.addOperatorWithoutInference(function.getKey(), op);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java
index 78e4c62..655f571 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java
@@ -40,10 +40,14 @@ public class DrillSimpleFuncHolder extends DrillFuncHolder {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSimpleFuncHolder.class);
private final String drillFuncClass;
+ // each function should be wrapped unique class loader associated with its jar
+ // to prevent classpath collisions during loading and unloading jars
+ private final ClassLoader classLoader;
public DrillSimpleFuncHolder(FunctionAttributes functionAttributes, FunctionInitializer initializer) {
super(functionAttributes, initializer);
drillFuncClass = checkNotNull(initializer.getClassName());
+ classLoader = checkNotNull(initializer.getClassLoader());
}
private String setupBody() {
@@ -65,7 +69,7 @@ public class DrillSimpleFuncHolder extends DrillFuncHolder {
}
public DrillSimpleFunc createInterpreter() throws Exception {
- return (DrillSimpleFunc)Class.forName(drillFuncClass).newInstance();
+ return (DrillSimpleFunc)Class.forName(drillFuncClass, true, classLoader).newInstance();
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
index 00be7aa..2f606e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
@@ -50,7 +50,7 @@ import com.google.common.collect.Lists;
public class FunctionConverter {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FunctionConverter.class);
- public <T extends DrillFunc> DrillFuncHolder getHolder(AnnotatedClassDescriptor func) {
+ public <T extends DrillFunc> DrillFuncHolder getHolder(AnnotatedClassDescriptor func, ClassLoader classLoader) {
FunctionTemplate template = func.getAnnotationProxy(FunctionTemplate.class);
if (template == null) {
return failure("Class does not declare FunctionTemplate annotation.", func);
@@ -173,7 +173,7 @@ public class FunctionConverter {
return failure("This function declares zero output fields. A function must declare one output field.", func);
}
- FunctionInitializer initializer = new FunctionInitializer(func.getClassName());
+ FunctionInitializer initializer = new FunctionInitializer(func.getClassName(), classLoader);
try{
// return holder
ValueReference[] ps = params.toArray(new ValueReference[params.size()]);
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
index 5d26325..ede255a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
@@ -17,38 +17,69 @@
*/
package org.apache.drill.exec.expr.fn;
+import java.io.File;
+import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.net.URLConnection;
+import java.util.Enumeration;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+import com.typesafe.config.ConfigFactory;
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.common.config.CommonConstants;
import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.fn.CastFunctions;
import org.apache.drill.common.scanner.ClassPathScanner;
+import org.apache.drill.common.scanner.RunTimeScan;
import org.apache.drill.common.scanner.persistence.ScanResult;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.coord.store.TransientStoreEvent;
+import org.apache.drill.exec.coord.store.TransientStoreListener;
+import org.apache.drill.exec.exception.FunctionValidationException;
+import org.apache.drill.exec.exception.JarValidationException;
+import org.apache.drill.exec.expr.fn.registry.LocalFunctionRegistry;
+import org.apache.drill.exec.expr.fn.registry.JarScan;
+import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry;
import org.apache.drill.exec.planner.sql.DrillOperatorTable;
+import org.apache.drill.exec.proto.UserBitShared.Jar;
import org.apache.drill.exec.resolver.FunctionResolver;
import org.apache.drill.exec.server.options.OptionManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
+import org.apache.drill.exec.util.JarUtil;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
/**
* This class offers the registry for functions. Notably, in addition to Drill its functions
- * (in {@link DrillFunctionRegistry}), other PluggableFunctionRegistry (e.g., {@link org.apache.drill.exec.expr.fn.HiveFunctionRegistry})
+ * (in {@link LocalFunctionRegistry}), other PluggableFunctionRegistry (e.g., {@link org.apache.drill.exec.expr.fn.HiveFunctionRegistry})
* is also registered in this class
*/
-public class FunctionImplementationRegistry implements FunctionLookupContext {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FunctionImplementationRegistry.class);
+public class FunctionImplementationRegistry implements FunctionLookupContext, AutoCloseable {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FunctionImplementationRegistry.class);
- private DrillFunctionRegistry drillFuncRegistry;
+ private final LocalFunctionRegistry localFunctionRegistry;
+ private final RemoteFunctionRegistry remoteFunctionRegistry;
+ private final Path localUdfDir;
+ private boolean deleteTmpDir = false;
+ private File tmpDir;
private List<PluggableFunctionRegistry> pluggableFuncRegistries = Lists.newArrayList();
private OptionManager optionManager = null;
@@ -61,7 +92,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext {
Stopwatch w = Stopwatch.createStarted();
logger.debug("Generating function registry.");
- drillFuncRegistry = new DrillFunctionRegistry(classpathScan);
+ localFunctionRegistry = new LocalFunctionRegistry(classpathScan);
Set<Class<? extends PluggableFunctionRegistry>> registryClasses =
classpathScan.getImplementations(PluggableFunctionRegistry.class);
@@ -85,7 +116,9 @@ public class FunctionImplementationRegistry implements FunctionLookupContext {
break;
}
}
- logger.info("Function registry loaded. {} functions loaded in {} ms.", drillFuncRegistry.size(), w.elapsed(TimeUnit.MILLISECONDS));
+ logger.info("Function registry loaded. {} functions loaded in {} ms.", localFunctionRegistry.size(), w.elapsed(TimeUnit.MILLISECONDS));
+ this.remoteFunctionRegistry = new RemoteFunctionRegistry(new UnregistrationListener());
+ this.localUdfDir = getLocalUdfDir(config);
}
public FunctionImplementationRegistry(DrillConfig config, ScanResult classpathScan, OptionManager optionManager) {
@@ -99,7 +132,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext {
*/
public void register(DrillOperatorTable operatorTable) {
// Register Drill functions first and move to pluggable function registries.
- drillFuncRegistry.register(operatorTable);
+ localFunctionRegistry.register(operatorTable);
for(PluggableFunctionRegistry registry : pluggableFuncRegistries) {
registry.register(operatorTable);
@@ -109,14 +142,26 @@ public class FunctionImplementationRegistry implements FunctionLookupContext {
/**
* Using the given <code>functionResolver</code> find Drill function implementation for given
* <code>functionCall</code>
- *
- * @param functionResolver
- * @param functionCall
- * @return
+ * If function implementation was not found and in case if Dynamic UDF Support is enabled
+ * loads all missing remote functions and tries to find Drill implementation one more time.
*/
@Override
public DrillFuncHolder findDrillFunction(FunctionResolver functionResolver, FunctionCall functionCall) {
- return functionResolver.getBestMatch(drillFuncRegistry.getMethods(functionReplacement(functionCall)), functionCall);
+ return findDrillFunction(functionResolver, functionCall, true);
+ }
+
+ private DrillFuncHolder findDrillFunction(FunctionResolver functionResolver, FunctionCall functionCall, boolean retry) {
+ AtomicLong version = new AtomicLong();
+ DrillFuncHolder holder = functionResolver.getBestMatch(
+ localFunctionRegistry.getMethods(functionReplacement(functionCall), version), functionCall);
+ if (holder == null && retry) {
+ if (optionManager != null && optionManager.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) {
+ if (loadRemoteFunctions(version.get())) {
+ findDrillFunction(functionResolver, functionCall, false);
+ }
+ }
+ }
+ return holder;
}
// Check if this Function Replacement is needed; if yes, return a new name. otherwise, return the original name
@@ -138,18 +183,26 @@ public class FunctionImplementationRegistry implements FunctionLookupContext {
/**
* Find the Drill function implementation that matches the name, arg types and return type.
- * @param name
- * @param argTypes
- * @param returnType
- * @return
+ * If exact function implementation was not found and in case if Dynamic UDF Support is enabled
+ * loads all missing remote functions and tries to find Drill implementation one more time.
*/
public DrillFuncHolder findExactMatchingDrillFunction(String name, List<MajorType> argTypes, MajorType returnType) {
- for (DrillFuncHolder h : drillFuncRegistry.getMethods(name)) {
+ return findExactMatchingDrillFunction(name, argTypes, returnType, true);
+ }
+
+ private DrillFuncHolder findExactMatchingDrillFunction(String name, List<MajorType> argTypes, MajorType returnType, boolean retry) {
+ AtomicLong version = new AtomicLong();
+ for (DrillFuncHolder h : localFunctionRegistry.getMethods(name, version)) {
if (h.matches(returnType, argTypes)) {
return h;
}
}
+ if (retry && optionManager != null && optionManager.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) {
+ if (loadRemoteFunctions(version.get())) {
+ findExactMatchingDrillFunction(name, argTypes, returnType, false);
+ }
+ }
return null;
}
@@ -177,7 +230,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext {
// Method to find if the output type of a drill function if of complex type
public boolean isFunctionComplexOutput(String name) {
- List<DrillFuncHolder> methods = drillFuncRegistry.getMethods(name);
+ List<DrillFuncHolder> methods = localFunctionRegistry.getMethods(name);
for (DrillFuncHolder holder : methods) {
if (holder.getReturnValue().isComplexWriter()) {
return true;
@@ -186,4 +239,257 @@ public class FunctionImplementationRegistry implements FunctionLookupContext {
return false;
}
+ public RemoteFunctionRegistry getRemoteFunctionRegistry() {
+ return remoteFunctionRegistry;
+ }
+
+ /**
+ * Using given local path to jar creates unique class loader for this jar.
+ * Class loader is closed to release opened connection to jar when validation is finished.
+ * Scan jar content to receive list of all scanned classes
+ * and starts validation process against local function registry.
+ * Checks if received list of validated function is not empty.
+ *
+ * @param path local path to jar we need to validate
+ * @return list of validated function signatures
+ */
+ public List<String> validate(Path path) throws IOException {
+ URL url = path.toUri().toURL();
+ URL[] urls = {url};
+ try (URLClassLoader classLoader = new URLClassLoader(urls)) {
+ ScanResult jarScanResult = scan(classLoader, path, urls);
+ List<String> functions = localFunctionRegistry.validate(path.getName(), jarScanResult);
+ if (functions.isEmpty()) {
+ throw new FunctionValidationException(String.format("Jar %s does not contain functions", path.getName()));
+ }
+ return functions;
+ }
+ }
+
+ /**
+ * Attempts to load and register functions from remote function registry.
+ * First checks if there is no missing jars.
+ * If yes, enters synchronized block to prevent other loading the same jars.
+ * Again re-checks if there are no missing jars in case someone has already loaded them (double-check lock).
+ * If there are still missing jars, first copies jars to local udf area and prepares {@link JarScan} for each jar.
+ * Jar registration timestamp represented in milliseconds is used as suffix.
+ * Then registers all jars at the same time. Returns true when finished.
+ * In case if any errors during jars coping or registration, logs errors and proceeds.
+ *
+ * If no missing jars are found, checks current local registry version.
+ * Returns false if versions match, true otherwise.
+ *
+ * @param version local function registry version
+ * @return true if new jars were registered or local function registry version is different, false otherwise
+ */
+ public boolean loadRemoteFunctions(long version) {
+ List<String> missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry);
+ if (!missingJars.isEmpty()) {
+ synchronized (this) {
+ missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry);
+ List<JarScan> jars = Lists.newArrayList();
+ for (String jarName : missingJars) {
+ Path binary = null;
+ Path source = null;
+ URLClassLoader classLoader = null;
+ try {
+ binary = copyJarToLocal(jarName, remoteFunctionRegistry);
+ source = copyJarToLocal(JarUtil.getSourceName(jarName), remoteFunctionRegistry);
+ URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()};
+ classLoader = new URLClassLoader(urls);
+ ScanResult scanResult = scan(classLoader, binary, urls);
+ localFunctionRegistry.validate(jarName, scanResult);
+ jars.add(new JarScan(jarName, scanResult, classLoader));
+ } catch (Exception e) {
+ deleteQuietlyLocalJar(binary);
+ deleteQuietlyLocalJar(source);
+ if (classLoader != null) {
+ try {
+ classLoader.close();
+ } catch (Exception ex) {
+ logger.warn("Problem during closing class loader for {}", jarName, e);
+ }
+ }
+ logger.error("Problem during remote functions load from {}", jarName, e);
+ }
+ }
+ if (!jars.isEmpty()) {
+ localFunctionRegistry.register(jars);
+ return true;
+ }
+ }
+ }
+ return version != localFunctionRegistry.getVersion();
+ }
+
+ /**
+ * First finds path to marker file url, otherwise throws {@link JarValidationException}.
+ * Then scans jar classes according to list indicated in marker files.
+ * Additional logic is added to close {@link URL} after {@link ConfigFactory#parseURL(URL)}.
+ * This is extremely important for Windows users where system doesn't allow to delete file if it's being used.
+ *
+ * @param classLoader unique class loader for jar
+ * @param path local path to jar
+ * @param urls urls associated with the jar (ex: binary and source)
+ * @return scan result of packages, classes, annotations found in jar
+ */
+ private ScanResult scan(ClassLoader classLoader, Path path, URL[] urls) throws IOException {
+ Enumeration<URL> markerFileEnumeration = classLoader.getResources(
+ CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME);
+ while (markerFileEnumeration.hasMoreElements()) {
+ URL markerFile = markerFileEnumeration.nextElement();
+ if (markerFile.getPath().contains(path.toUri().getPath())) {
+ URLConnection markerFileConnection = null;
+ try {
+ markerFileConnection = markerFile.openConnection();
+ DrillConfig drillConfig = DrillConfig.create(ConfigFactory.parseURL(markerFile));
+ return RunTimeScan.dynamicPackageScan(drillConfig, Sets.newHashSet(urls));
+ } finally {
+ if (markerFileConnection instanceof JarURLConnection) {
+ ((JarURLConnection) markerFile.openConnection()).getJarFile().close();
+ }
+ }
+ }
+ }
+ throw new JarValidationException(String.format("Marker file %s is missing in %s",
+ CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, path.getName()));
+ }
+
+ /**
+ * Return list of jars that are missing in local function registry
+ * but present in remote function registry.
+ *
+ * @param remoteFunctionRegistry remote function registry
+ * @param localFunctionRegistry local function registry
+ * @return list of missing jars
+ */
+ private List<String> getMissingJars(RemoteFunctionRegistry remoteFunctionRegistry,
+ LocalFunctionRegistry localFunctionRegistry) {
+ List<Jar> remoteJars = remoteFunctionRegistry.getRegistry().getJarList();
+ List<String> localJars = localFunctionRegistry.getAllJarNames();
+ List<String> missingJars = Lists.newArrayList();
+ for (Jar jar : remoteJars) {
+ if (!localJars.contains(jar.getName())) {
+ missingJars.add(jar.getName());
+ }
+ }
+ return missingJars;
+ }
+
+ /**
+ * Creates local udf directory, if it doesn't exist.
+ * Checks if local udf directory is a directory and if current application has write rights on it.
+ * Attempts to clean up local udf directory in case jars were left after previous drillbit run.
+ * Local udf directory path is concatenated from drill temporary directory and ${drill.exec.udf.directory.local}.
+ *
+ * @param config drill config
+ * @return path to local udf directory
+ */
+ private Path getLocalUdfDir(DrillConfig config) {
+ tmpDir = getTmpDir(config);
+ File udfDir = new File(tmpDir, config.getString(ExecConstants.UDF_DIRECTORY_LOCAL));
+ udfDir.mkdirs();
+ String udfPath = udfDir.getPath();
+ Preconditions.checkState(udfDir.exists(), "Local udf directory [%s] must exist", udfPath);
+ Preconditions.checkState(udfDir.isDirectory(), "Local udf directory [%s] must be a directory", udfPath);
+ Preconditions.checkState(udfDir.canWrite(), "Local udf directory [%s] must be writable for application user", udfPath);
+ try {
+ FileUtils.cleanDirectory(udfDir);
+ } catch (IOException e) {
+ throw new DrillRuntimeException("Error during local udf directory clean up", e);
+ }
+ return new Path(udfDir.toURI());
+ }
+
+ /**
+ * First tries to get drill temporary directory value from environmental variable $DRILL_TMP_DIR,
+ * then from config ${drill.tmp-dir}.
+ * If value is still missing, generates directory using {@link Files#createTempDir()}.
+ * If temporary directory was generated, sets {@link #deleteTmpDir} to true
+ * to delete directory on drillbit exit.
+ * @return drill temporary directory path
+ */
+ private File getTmpDir(DrillConfig config) {
+ String drillTempDir = System.getenv("DRILL_TMP_DIR");
+
+ if (drillTempDir == null && config.hasPath(ExecConstants.DRILL_TMP_DIR)) {
+ drillTempDir = config.getString(ExecConstants.DRILL_TMP_DIR);
+ }
+
+ if (drillTempDir == null) {
+ deleteTmpDir = true;
+ return Files.createTempDir();
+ }
+
+ return new File(drillTempDir);
+ }
+
+ /**
+ * Copies jar from remote udf area to local udf area.
+ *
+ * @param jarName jar name to be copied
+ * @param remoteFunctionRegistry remote function registry
+ * @return local path to jar that was copied
+ * @throws IOException in case of problems during jar coping process
+ */
+ private Path copyJarToLocal(String jarName, RemoteFunctionRegistry remoteFunctionRegistry) throws IOException {
+ Path registryArea = remoteFunctionRegistry.getRegistryArea();
+ FileSystem fs = remoteFunctionRegistry.getFs();
+ Path remoteJar = new Path(registryArea, jarName);
+ Path localJar = new Path(localUdfDir, jarName);
+ try {
+ fs.copyToLocalFile(remoteJar, localJar);
+ } catch (IOException e) {
+ String message = String.format("Error during jar [%s] coping from [%s] to [%s]",
+ jarName, registryArea.toUri().getPath(), localUdfDir.toUri().getPath());
+ throw new IOException(message, e);
+ }
+ return localJar;
+ }
+
+ /**
+ * Deletes quietly local jar but first checks if path to jar is not null.
+ *
+ * @param jar path to jar
+ */
+ private void deleteQuietlyLocalJar(Path jar) {
+ if (jar != null) {
+ FileUtils.deleteQuietly(new File(jar.toUri().getPath()));
+ }
+ }
+
+ /**
+ * If {@link #deleteTmpDir} is set to true, deletes generated temporary directory.
+ * Otherwise cleans up {@link #localUdfDir}.
+ */
+ @Override
+ public void close() {
+ if (deleteTmpDir) {
+ FileUtils.deleteQuietly(tmpDir);
+ } else {
+ try {
+ FileUtils.cleanDirectory(new File(localUdfDir.toUri().getPath()));
+ } catch (IOException e) {
+ logger.warn("Problems during local udf directory clean up", e);
+ }
+ }
+ }
+
+ /**
+ * Fires when jar name is submitted for unregistration.
+ * Will unregister all functions associated with the jar name
+ * and delete binary and source associated with the jar from local udf directory
+ */
+ public class UnregistrationListener implements TransientStoreListener {
+
+ @Override
+ public void onChange(TransientStoreEvent event) {
+ String jarName = (String) event.getValue();
+ localFunctionRegistry.unregister(jarName);
+ String localDir = localUdfDir.toUri().getPath();
+ FileUtils.deleteQuietly(new File(localDir, jarName));
+ FileUtils.deleteQuietly(new File(localDir, JarUtil.getSourceName(jarName)));
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionInitializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionInitializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionInitializer.java
index 1007afc..4e5ee4f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionInitializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionInitializer.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.expr.fn;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
-import java.net.URL;
import java.util.List;
import java.util.Map;
@@ -33,7 +32,6 @@ import org.codehaus.janino.Scanner;
import org.mortbay.util.IO;
import com.google.common.collect.Maps;
-import com.google.common.io.Resources;
/**
* To avoid the cost of initializing all functions up front,
@@ -42,8 +40,8 @@ import com.google.common.io.Resources;
public class FunctionInitializer {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FunctionInitializer.class);
- private String className;
-
+ private final String className;
+ private final ClassLoader classLoader;
private Map<String, CompilationUnit> functionUnits = Maps.newHashMap();
private Map<String, String> methods;
private List<String> imports;
@@ -51,13 +49,21 @@ public class FunctionInitializer {
/**
* @param className the fully qualified name of the class implementing the function
+ * @param classLoader class loader associated with the function, is unique for each jar that holds function
+ * to prevent classpath collisions during loading an unloading jars
*/
- public FunctionInitializer(String className) {
+ public FunctionInitializer(String className, ClassLoader classLoader) {
super();
this.className = className;
+ this.classLoader = classLoader;
}
/**
+ * @return returns class loader
+ */
+ public ClassLoader getClassLoader() { return classLoader; }
+
+ /**
* @return the fully qualified name of the class implementing the function
*/
public String getClassName() {
@@ -94,7 +100,7 @@ public class FunctionInitializer {
// get function body.
try {
- final Class<?> clazz = Class.forName(className);
+ final Class<?> clazz = Class.forName(className, true, classLoader);
final CompilationUnit cu = get(clazz);
if (cu == null) {
@@ -123,8 +129,7 @@ public class FunctionInitializer {
return cu;
}
- URL u = Resources.getResource(c, path);
- try (InputStream is = Resources.asByteSource(u).openStream()) {
+ try (InputStream is = c.getResourceAsStream(path)) {
if (is == null) {
throw new IOException(String.format(
"Failure trying to located source code for Class %s, tried to read on classpath location %s", c.getName(),
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionHolder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionHolder.java
new file mode 100644
index 0000000..4b93c88
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionHolder.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.fn.registry;
+
+import org.apache.drill.exec.expr.fn.DrillFuncHolder;
+
+/**
+ * Holder class that contains:
+ * <ol>
+ * <li>function name</li>
+ * <li>function signature which is string representation of function name and its input parameters</li>
+ * <li>{@link DrillFuncHolder} associated with the function</li>
+ * </ol>
+ */
+public class FunctionHolder {
+
+ private final String name;
+ private final String signature;
+ private final DrillFuncHolder holder;
+
+ public FunctionHolder(String name, String signature, DrillFuncHolder holder) {
+ this.name = name;
+ this.signature = signature;
+ this.holder = holder;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public DrillFuncHolder getHolder() {
+ return holder;
+ }
+
+ public String getSignature() {
+ return signature;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
new file mode 100644
index 0000000..005c4e5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.fn.registry;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import org.apache.drill.common.concurrent.AutoCloseableLock;
+import org.apache.drill.exec.expr.fn.DrillFuncHolder;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Function registry holder stores function implementations by jar name, function name.
+ * Contains two maps that hold data by jars and functions respectively.
+ * Jars map contains each jar as a key and map of all its functions with collection of function signatures as value.
+ * Functions map contains function name as key and map of its signatures and function holder as value.
+ * All maps and collections used are concurrent to guarantee memory consistency effects.
+ * Such structure is chosen to achieve maximum speed while retrieving data by jar or by function name,
+ * since we expect infrequent registry changes.
+ * Holder is designed to allow concurrent reads and single writes to keep data consistent.
+ * This is achieved by {@link ReadWriteLock} implementation usage.
+ * Holder has number version which changes every time new jars are added or removed. Initial version number is 0.
+ * Also version is used when user needs data from registry with version it is based on.
+ *
+ * Structure example:
+ *
+ * JARS
+ * built-in -> upper -> upper(VARCHAR-REQUIRED)
+ * -> lower -> lower(VARCHAR-REQUIRED)
+ *
+ * First.jar -> upper -> upper(VARCHAR-OPTIONAL)
+ * -> custom_upper -> custom_upper(VARCHAR-REQUIRED)
+ * -> custom_upper(VARCHAR-OPTIONAL)
+ *
+ * Second.jar -> lower -> lower(VARCHAR-OPTIONAL)
+ * -> custom_upper -> custom_upper(VARCHAR-REQUIRED)
+ * -> custom_upper(VARCHAR-OPTIONAL)
+ *
+ * FUNCTIONS
+ * upper -> upper(VARCHAR-REQUIRED) -> function holder for upper(VARCHAR-REQUIRED)
+ * -> upper(VARCHAR-OPTIONAL) -> function holder for upper(VARCHAR-OPTIONAL)
+ *
+ * lower -> lower(VARCHAR-REQUIRED) -> function holder for lower(VARCHAR-REQUIRED)
+ * -> lower(VARCHAR-OPTIONAL) -> function holder for lower(VARCHAR-OPTIONAL)
+ *
+ * custom_upper -> custom_upper(VARCHAR-REQUIRED) -> function holder for custom_upper(VARCHAR-REQUIRED)
+ * -> custom_upper(VARCHAR-OPTIONAL) -> function holder for custom_upper(VARCHAR-OPTIONAL)
+ *
+ * custom_lower -> custom_lower(VARCHAR-REQUIRED) -> function holder for custom_lower(VARCHAR-REQUIRED)
+ * -> custom_lower(VARCHAR-OPTIONAL) -> function holder for custom_lower(VARCHAR-OPTIONAL)
+ *
+ * where
+ * First.jar is jar name represented by String
+ * upper is function name represented by String
+ * upper(VARCHAR-REQUIRED) is signature name represented by String which consist of function name, list of input parameters
+ * function holder for upper(VARCHAR-REQUIRED) is {@link DrillFuncHolder} initiated for each function.
+ *
+ */
+public class FunctionRegistryHolder {
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FunctionRegistryHolder.class);
+
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock());
+ private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock());
+ private long version = 0;
+
+ // jar name, Map<function name, Queue<function signature>
+ private final Map<String, Map<String, Queue<String>>> jars;
+
+ // function name, Map<function signature, function holder>
+ private final Map<String, Map<String, DrillFuncHolder>> functions;
+
+ public FunctionRegistryHolder() {
+ this.functions = Maps.newConcurrentMap();
+ this.jars = Maps.newConcurrentMap();
+ }
+
+ /**
+ * This is read operation, so several users at a time can get this data.
+ * @return local function registry version number
+ */
+ public long getVersion() {
+ try (AutoCloseableLock lock = readLock.open()) {
+ return version;
+ }
+ }
+
+ /**
+ * Adds jars to the function registry.
+ * If jar with the same name already exists, it and its functions will be removed.
+ * Then jar will be added to {@link #jars}
+ * and each function will be added using {@link #addFunctions(Map, List)}.
+ * Function version registry will be incremented by 1 if at least one jar was added but not for each jar.
+ * This is write operation, so one user at a time can call perform such action,
+ * others will wait till first user completes his action.
+ *
+ * @param newJars jars and list of their function holders, each contains function name, signature and holder
+ */
+ public void addJars(Map<String, List<FunctionHolder>> newJars) {
+ try (AutoCloseableLock lock = writeLock.open()) {
+ for (Map.Entry<String, List<FunctionHolder>> newJar : newJars.entrySet()) {
+ String jarName = newJar.getKey();
+ removeAllByJar(jarName);
+ Map<String, Queue<String>> jar = Maps.newConcurrentMap();
+ jars.put(jarName, jar);
+ addFunctions(jar, newJar.getValue());
+ }
+ if (!newJars.isEmpty()) {
+ version++;
+ }
+ }
+ }
+
+ /**
+ * Removes jar from {@link #jars} and all associated with jar functions from {@link #functions}
+ * If jar was removed, function registry version will be incremented by 1.
+ * This is write operation, so one user at a time can call perform such action,
+ * others will wait till first user completes his action.
+ *
+ * @param jarName jar name to be removed
+ */
+ public void removeJar(String jarName) {
+ try (AutoCloseableLock lock = writeLock.open()) {
+ if (removeAllByJar(jarName)) {
+ version++;
+ }
+ }
+ }
+
+ /**
+ * Retrieves list of all jars name present in {@link #jars}
+ * This is read operation, so several users can get this data.
+ *
+ * @return list of all jar names
+ */
+ public List<String> getAllJarNames() {
+ try (AutoCloseableLock lock = readLock.open()) {
+ return Lists.newArrayList(jars.keySet());
+ }
+ }
+
+ /**
+ * Retrieves all function names associated with the jar from {@link #jars}.
+ * Returns empty list if jar is not registered.
+ * This is read operation, so several users can perform this operation at the same time.
+ *
+ * @param jarName jar name
+ * @return list of functions names associated from the jar
+ */
+ public List<String> getFunctionNamesByJar(String jarName) {
+ try (AutoCloseableLock lock = readLock.open()){
+ Map<String, Queue<String>> functions = jars.get(jarName);
+ return functions == null ? Lists.<String>newArrayList() : Lists.newArrayList(functions.keySet());
+ }
+ }
+
+ /**
+ * Returns list of functions with list of function holders for each functions.
+ * Uses guava {@link ListMultimap} structure to return data.
+ * If no functions present, will return empty {@link ListMultimap}.
+ * If version holder is not null, updates it with current registry version number.
+ * This is read operation, so several users can perform this operation at the same time.
+ *
+ * @param version version holder
+ * @return all functions which their holders
+ */
+ public ListMultimap<String, DrillFuncHolder> getAllFunctionsWithHolders(AtomicLong version) {
+ try (AutoCloseableLock lock = readLock.open()) {
+ if (version != null) {
+ version.set(this.version);
+ }
+ ListMultimap<String, DrillFuncHolder> functionsWithHolders = ArrayListMultimap.create();
+ for (Map.Entry<String, Map<String, DrillFuncHolder>> function : functions.entrySet()) {
+ functionsWithHolders.putAll(function.getKey(), Lists.newArrayList(function.getValue().values()));
+ }
+ return functionsWithHolders;
+ }
+ }
+
+ /**
+ * Returns list of functions with list of function holders for each functions without version number.
+ * This is read operation, so several users can perform this operation at the same time.
+ *
+ * @return all functions which their holders
+ */
+ public ListMultimap<String, DrillFuncHolder> getAllFunctionsWithHolders() {
+ return getAllFunctionsWithHolders(null);
+ }
+
+ /**
+ * Returns list of functions with list of function signatures for each functions.
+ * Uses guava {@link ListMultimap} structure to return data.
+ * If no functions present, will return empty {@link ListMultimap}.
+ * This is read operation, so several users can perform this operation at the same time.
+ *
+ * @return all functions which their signatures
+ */
+ public ListMultimap<String, String> getAllFunctionsWithSignatures() {
+ try (AutoCloseableLock lock = readLock.open()) {
+ ListMultimap<String, String> functionsWithSignatures = ArrayListMultimap.create();
+ for (Map.Entry<String, Map<String, DrillFuncHolder>> function : functions.entrySet()) {
+ functionsWithSignatures.putAll(function.getKey(), Lists.newArrayList(function.getValue().keySet()));
+ }
+ return functionsWithSignatures;
+ }
+ }
+
+ /**
+ * Returns all function holders associated with function name.
+ * If function is not present, will return empty list.
+ * If version holder is not null, updates it with current registry version number.
+ * This is read operation, so several users can perform this operation at the same time.
+ *
+ * @param functionName function name
+ * @param version version holder
+ * @return list of function holders
+ */
+ public List<DrillFuncHolder> getHoldersByFunctionName(String functionName, AtomicLong version) {
+ try (AutoCloseableLock lock = readLock.open()) {
+ if (version != null) {
+ version.set(this.version);
+ }
+ Map<String, DrillFuncHolder> holders = functions.get(functionName);
+ return holders == null ? Lists.<DrillFuncHolder>newArrayList() : Lists.newArrayList(holders.values());
+ }
+ }
+
+ /**
+ * Returns all function holders associated with function name without version number.
+ * This is read operation, so several users can perform this operation at the same time.
+ *
+ * @param functionName function name
+ * @return list of function holders
+ */
+ public List<DrillFuncHolder> getHoldersByFunctionName(String functionName) {
+ return getHoldersByFunctionName(functionName, null);
+ }
+
+ /**
+ * Checks is jar is present in {@link #jars}.
+ * This is read operation, so several users can perform this operation at the same time.
+ *
+ * @param jarName jar name
+ * @return true if jar exists, else false
+ */
+ public boolean containsJar(String jarName) {
+ try (AutoCloseableLock lock = readLock.open()) {
+ return jars.containsKey(jarName);
+ }
+ }
+
+ /**
+ * Returns quantity of functions stored in {@link #functions}.
+ * This is read operation, so several users can perform this operation at the same time.
+ *
+ * @return quantity of functions
+ */
+ public int functionsSize() {
+ try (AutoCloseableLock lock = readLock.open()) {
+ return functions.size();
+ }
+ }
+
+ /**
+ * Looks which jar in {@link #jars} contains passed function signature.
+ * First looks by function name and if found checks if such function has passed function signature.
+ * Returns jar name if found matching function signature, else null.
+ * This is read operation, so several users can perform this operation at the same time.
+ *
+ * @param functionName function name
+ * @param functionSignature function signature
+ * @return jar name
+ */
+ public String getJarNameByFunctionSignature(String functionName, String functionSignature) {
+ try (AutoCloseableLock lock = readLock.open()) {
+ for (Map.Entry<String, Map<String, Queue<String>>> jar : jars.entrySet()) {
+ Queue<String> functionSignatures = jar.getValue().get(functionName);
+ if (functionSignatures != null && functionSignatures.contains(functionSignature)) {
+ return jar.getKey();
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Adds all function names and signatures to passed jar,
+ * adds all function names, their signatures and holders to {@link #functions}.
+ *
+ * @param jar jar where function to be added
+ * @param newFunctions collection of function holders, each contains function name, signature and holder.
+ */
+ private void addFunctions(Map<String, Queue<String>> jar, List<FunctionHolder> newFunctions) {
+ for (FunctionHolder function : newFunctions) {
+ final String functionName = function.getName();
+ Queue<String> jarFunctions = jar.get(functionName);
+ if (jarFunctions == null) {
+ jarFunctions = Queues.newConcurrentLinkedQueue();;
+ jar.put(functionName, jarFunctions);
+ }
+ final String functionSignature = function.getSignature();
+ jarFunctions.add(functionSignature);
+
+ Map<String, DrillFuncHolder> signatures = functions.get(functionName);
+ if (signatures == null) {
+ signatures = Maps.newConcurrentMap();
+ functions.put(functionName, signatures);
+ }
+ signatures.put(functionSignature, function.getHolder());
+ }
+ }
+
+ /**
+ * Removes jar from {@link #jars} and all associated with jars functions from {@link #functions}
+ * Since each jar is loaded with separate class loader before
+ * removing we need to close class loader to release opened connection to jar.
+ * All jar functions have the same class loader, so we need to close only one time.
+ *
+ * @param jarName jar name to be removed
+ * @return true if jar was removed, false otherwise
+ */
+ private boolean removeAllByJar(String jarName) {
+ Map<String, Queue<String>> jar = jars.remove(jarName);
+ if (jar == null) {
+ return false;
+ }
+
+ for (Map.Entry<String, Queue<String>> functionEntry : jar.entrySet()) {
+ final String function = functionEntry.getKey();
+ Map<String, DrillFuncHolder> functionHolders = functions.get(function);
+ Queue<String> functionSignatures = functionEntry.getValue();
+ for (Map.Entry<String, DrillFuncHolder> entry : functionHolders.entrySet()) {
+ if (functionSignatures.contains(entry.getKey())) {
+ ClassLoader classLoader = entry.getValue().getClassLoader();
+ if (classLoader instanceof AutoCloseable) {
+ try {
+ ((AutoCloseable) classLoader).close();
+ } catch (Exception e) {
+ logger.warn("Problem during closing class loader", e);
+ }
+ }
+ break;
+ }
+ }
+ functionHolders.keySet().removeAll(functionSignatures);
+
+ if (functionHolders.isEmpty()) {
+ functions.remove(function);
+ }
+ }
+ return true;
+ }
+}