You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2016/10/18 23:45:29 UTC

[09/11] drill git commit: DRILL-4726: Dynamic UDF Support

DRILL-4726: Dynamic UDF Support

1) Configuration / parsing / options / protos
2) Zookeeper integration
3) Registration / unregistration / lazy-init
4) Unit tests

This closes #574


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/89f2633f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/89f2633f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/89f2633f

Branch: refs/heads/master
Commit: 89f2633f612a645666de8f51dcb19c6f8044a95e
Parents: 8461d10
Author: Arina Ielchiieva <ar...@gmail.com>
Authored: Mon Aug 22 13:29:30 2016 +0000
Committer: Parth Chandra <pa...@apache.org>
Committed: Tue Oct 18 10:47:52 2016 -0700

----------------------------------------------------------------------
 .../apache/drill/common/config/DrillConfig.java |    9 +
 .../drill/common/scanner/RunTimeScan.java       |   21 +
 distribution/src/resources/drill-config.sh      |   18 +
 distribution/src/resources/sqlline.bat          |    5 +
 exec/java-exec/src/main/codegen/data/Parser.tdd |    7 +-
 .../src/main/codegen/includes/parserImpls.ftl   |   40 +
 .../org/apache/drill/exec/ExecConstants.java    |   27 +-
 .../drill/exec/coord/zk/ZkEphemeralStore.java   |   17 +-
 .../drill/exec/coord/zk/ZookeeperClient.java    |  101 +-
 .../exception/FunctionNotFoundException.java    |   27 +
 .../exception/FunctionValidationException.java  |   28 +
 .../exec/exception/JarValidationException.java  |   28 +
 .../exception/VersionMismatchException.java     |   33 +
 .../drill/exec/expr/fn/DrillFuncHolder.java     |   28 +
 .../exec/expr/fn/DrillFunctionRegistry.java     |  221 ---
 .../exec/expr/fn/DrillSimpleFuncHolder.java     |    6 +-
 .../drill/exec/expr/fn/FunctionConverter.java   |    4 +-
 .../expr/fn/FunctionImplementationRegistry.java |  342 ++++-
 .../drill/exec/expr/fn/FunctionInitializer.java |   21 +-
 .../exec/expr/fn/registry/FunctionHolder.java   |   54 +
 .../fn/registry/FunctionRegistryHolder.java     |  377 +++++
 .../drill/exec/expr/fn/registry/JarScan.java    |   53 +
 .../expr/fn/registry/LocalFunctionRegistry.java |  329 ++++
 .../fn/registry/RemoteFunctionRegistry.java     |  269 ++++
 .../org/apache/drill/exec/ops/QueryContext.java |    5 +
 .../exec/planner/sql/DrillOperatorTable.java    |   24 +-
 .../drill/exec/planner/sql/DrillSqlWorker.java  |   27 +-
 .../drill/exec/planner/sql/SqlConverter.java    |    9 +
 .../sql/handlers/CreateFunctionHandler.java     |  328 ++++
 .../sql/handlers/DropFunctionHandler.java       |  167 ++
 .../sql/parser/CompoundIdentifierConverter.java |    2 +
 .../planner/sql/parser/SqlCreateFunction.java   |   79 +
 .../planner/sql/parser/SqlDropFunction.java     |   79 +
 .../rpc/user/InboundImpersonationManager.java   |    6 +-
 .../org/apache/drill/exec/server/Drillbit.java  |    1 +
 .../drill/exec/server/DrillbitContext.java      |    5 +
 .../exec/server/options/OptionValidator.java    |   14 +
 .../server/options/SystemOptionManager.java     |    3 +-
 .../exec/server/options/TypeValidators.java     |   51 +-
 .../exec/store/sys/BasePersistentStore.java     |   18 +-
 .../drill/exec/store/sys/PersistentStore.java   |   21 +
 .../exec/store/sys/store/DataChangeVersion.java |   32 +
 .../sys/store/ZookeeperPersistentStore.java     |   36 +-
 .../exec/testing/store/NoWriteLocalStore.java   |   61 +-
 .../org/apache/drill/exec/util/JarUtil.java     |   33 +
 .../src/main/resources/drill-module.conf        |   29 +
 .../java/org/apache/drill/BaseTestQuery.java    |   14 -
 .../org/apache/drill/TestDynamicUDFSupport.java |  801 ++++++++++
 .../java/org/apache/drill/exec/ExecTest.java    |   22 +-
 .../exec/coord/zk/TestZookeeperClient.java      |   49 +-
 .../fn/registry/FunctionRegistryHolderTest.java |  279 ++++
 .../exec/physical/impl/TestSimpleFunctions.java |   20 +-
 .../resources/jars/DrillUDF-1.0-sources.jar     |  Bin 0 -> 1892 bytes
 .../src/test/resources/jars/DrillUDF-1.0.jar    |  Bin 0 -> 3146 bytes
 .../resources/jars/DrillUDF-2.0-sources.jar     |  Bin 0 -> 1891 bytes
 .../src/test/resources/jars/DrillUDF-2.0.jar    |  Bin 0 -> 3142 bytes
 .../jars/DrillUDF_Copy-1.0-sources.jar          |  Bin 0 -> 1892 bytes
 .../test/resources/jars/DrillUDF_Copy-1.0.jar   |  Bin 0 -> 3185 bytes
 .../jars/DrillUDF_DupFunc-1.0-sources.jar       |  Bin 0 -> 1888 bytes
 .../resources/jars/DrillUDF_DupFunc-1.0.jar     |  Bin 0 -> 3201 bytes
 .../jars/DrillUDF_Empty-1.0-sources.jar         |  Bin 0 -> 536 bytes
 .../test/resources/jars/DrillUDF_Empty-1.0.jar  |  Bin 0 -> 1863 bytes
 .../jars/DrillUDF_NoMarkerFile-1.0-sources.jar  |  Bin 0 -> 1715 bytes
 .../jars/DrillUDF_NoMarkerFile-1.0.jar          |  Bin 0 -> 3084 bytes
 .../resources/jars/v2/DrillUDF-1.0-sources.jar  |  Bin 0 -> 1899 bytes
 .../src/test/resources/jars/v2/DrillUDF-1.0.jar |  Bin 0 -> 3215 bytes
 .../org/apache/drill/jdbc/ITTestShadedJar.java  |   19 +
 .../drill/exec/proto/SchemaUserBitShared.java   |  231 +++
 .../apache/drill/exec/proto/UserBitShared.java  | 1439 +++++++++++++++++-
 .../org/apache/drill/exec/proto/beans/Jar.java  |  195 +++
 .../apache/drill/exec/proto/beans/Registry.java |  175 +++
 protocol/src/main/protobuf/UserBitShared.proto  |   20 +
 72 files changed, 5986 insertions(+), 373 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
index 43d05c3..6828718 100644
--- a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
+++ b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
@@ -164,6 +164,15 @@ public class DrillConfig extends NestedConfig {
   }
 
   /**
+   * Creates a drill configuration using the provided config file.
+   * @param config custom configuration file
+   * @return {@link DrillConfig} instance
+   */
+  public static DrillConfig create(Config config) {
+    return new DrillConfig(config.resolve(), true);
+  }
+
+  /**
    * @param overrideFileResourcePathname
    *          see {@link #create(String)}'s {@code overrideFileResourcePathname}
    * @param overriderProps

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java b/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java
index 1d95b04..7faa0fb 100644
--- a/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java
+++ b/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java
@@ -20,7 +20,9 @@ package org.apache.drill.common.scanner;
 import java.net.URL;
 import java.util.Collection;
 import java.util.List;
+import java.util.Set;
 
+import com.google.common.collect.Lists;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.scanner.persistence.ScanResult;
 
@@ -75,4 +77,23 @@ public class RunTimeScan {
     }
   }
 
+  /**
+   * Scans packages retrieved from config.
+   * Returns scan result with list of packages, classes and annotations found.
+   * Is used to scan specific jars not associated with classpath at runtime.
+   *
+   * @param config to retrieve the packages to scan
+   * @param markedPath list of paths where to scan
+   * @return the scan result with list of packages, classes and annotations found
+   */
+  public static ScanResult dynamicPackageScan(DrillConfig config, Set<URL> markedPath) {
+    List<String> packagePrefixes = ClassPathScanner.getPackagePrefixes(config);
+    return ClassPathScanner.scan(
+        markedPath,
+        packagePrefixes,
+        Lists.<String>newArrayList(),
+        PRESCANNED.getScannedAnnotations(),
+        ClassPathScanner.emptyResult());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/distribution/src/resources/drill-config.sh
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-config.sh b/distribution/src/resources/drill-config.sh
index 737be36..7a72e27 100644
--- a/distribution/src/resources/drill-config.sh
+++ b/distribution/src/resources/drill-config.sh
@@ -324,6 +324,22 @@ if [ -n "$DRILL_CLASSPATH" ]; then
   CP="$CP:$DRILL_CLASSPATH"
 fi
 
+# Drill temporary directory is used as base for temporary storage of Dynamic UDF jars.
+# If tmp dir is given, it must exist.
+if [ -n "$DRILL_TMP_DIR" ]; then
+  if [[ ! -d "$DRILL_TMP_DIR" ]]; then
+    fatal_error "Temporary dir does not exist:" $DRILL_TMP_DIR
+  fi
+else
+  # Otherwise, use the default
+  DRILL_TMP_DIR="/tmp"
+fi
+
+mkdir -p "$DRILL_TMP_DIR"
+if [[ ! -d "$DRILL_TMP_DIR" || ! -w "$DRILL_TMP_DIR" ]]; then
+  fatal_error "Temporary directory does not exist or is not writable: $DRILL_TMP_DIR"
+fi
+
 # Test for cygwin
 is_cygwin=false
 case "`uname`" in
@@ -371,6 +387,7 @@ if $is_cygwin; then
   DRILL_HOME=`cygpath -w "$DRILL_HOME"`
   DRILL_CONF_DIR=`cygpath -w "$DRILL_CONF_DIR"`
   DRILL_LOG_DIR=`cygpath -w "$DRILL_LOG_DIR"`
+  DRILL_TMP_DIR=`cygpath -w "$DRILL_TMP_DIR"`
   CP=`cygpath -w -p "$CP"`
   if [ -z "$HADOOP_HOME" ]; then
     export HADOOP_HOME=${DRILL_HOME}/winutils
@@ -391,6 +408,7 @@ export is_cygwin
 export DRILL_HOME
 export DRILL_CONF_DIR
 export DRILL_LOG_DIR
+export DRILL_TMP_DIR
 export CP
 # DRILL-4870: Don't export JAVA_HOME. Java can find it just fine from the java
 # command. If we attempt to work it out, we do so incorrectly for the Mac.

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/distribution/src/resources/sqlline.bat
----------------------------------------------------------------------
diff --git a/distribution/src/resources/sqlline.bat b/distribution/src/resources/sqlline.bat
index a0efdf1..f008604 100755
--- a/distribution/src/resources/sqlline.bat
+++ b/distribution/src/resources/sqlline.bat
@@ -114,6 +114,11 @@ if "test%DRILL_LOG_DIR%" == "test" (
   set DRILL_LOG_DIR=%DRILL_HOME%\log
 )
 
+@rem Drill temporary directory is used as base for temporary storage of Dynamic UDF jars.
+if "test%DRILL_TMP_DIR%" == "test" (
+  set DRILL_TMP_DIR=%TEMP%
+)
+
 rem ----
 rem Deal with Hadoop JARs, if HADOOP_HOME was specified
 rem ----

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/codegen/data/Parser.tdd
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/data/Parser.tdd b/exec/java-exec/src/main/codegen/data/Parser.tdd
index ce3ee4c..6c23808 100644
--- a/exec/java-exec/src/main/codegen/data/Parser.tdd
+++ b/exec/java-exec/src/main/codegen/data/Parser.tdd
@@ -38,7 +38,8 @@
     "REFRESH",
     "METADATA",
     "DATABASE",
-    "IF"
+    "IF",
+    "JAR"
   ]
 
   # List of methods for parsing custom SQL statements.
@@ -53,7 +54,9 @@
     "SqlShowFiles()",
     "SqlCreateTable()",
     "SqlDropTable()",
-    "SqlRefreshMetadata()"
+    "SqlRefreshMetadata()",
+    "SqlCreateFunction()",
+    "SqlDropFunction()"
   ]
 
   # List of methods for parsing custom literals.

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
index 9901098..0017446 100644
--- a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
+++ b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
@@ -297,4 +297,44 @@ SqlNode SqlDescribeSchema() :
    {
         return new SqlDescribeSchema(pos, schema);
    }
+}
+
+/**
+* Parse create UDF statement
+* CREATE FUNCTION USING JAR 'jar_name'
+*/
+SqlNode SqlCreateFunction() :
+{
+   SqlParserPos pos;
+   SqlNode jar;
+}
+{
+   <CREATE> { pos = getPos(); }
+   <FUNCTION>
+   <USING>
+   <JAR>
+   jar = StringLiteral()
+   {
+       return new SqlCreateFunction(pos, jar);
+   }
+}
+
+/**
+* Parse drop UDF statement
+* DROP FUNCTION USING JAR 'jar_name'
+*/
+SqlNode SqlDropFunction() :
+{
+   SqlParserPos pos;
+   SqlNode jar;
+}
+{
+   <DROP> { pos = getPos(); }
+   <FUNCTION>
+   <USING>
+   <JAR>
+   jar = StringLiteral()
+   {
+       return new SqlDropFunction(pos, jar);
+   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index d6a210a..0f2321b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.rpc.user.InboundImpersonationManager;
 import org.apache.drill.exec.server.options.OptionValidator;
-import org.apache.drill.exec.server.options.TypeValidators.AdminOptionValidator;
 import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator;
 import org.apache.drill.exec.server.options.TypeValidators.DoubleValidator;
 import org.apache.drill.exec.server.options.TypeValidators.EnumeratedStringValidator;
@@ -106,10 +105,23 @@ public interface ExecConstants {
   String RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS =
       "drill.exec.debug.return_error_for_failure_in_cancelled_fragments";
 
+  String CLIENT_SUPPORT_COMPLEX_TYPES = "drill.client.supports-complex-types";
 
+  /**
+   * Configuration properties connected with dynamic UDFs support
+   */
+  String UDF_RETRY_ATTEMPTS = "drill.exec.udf.retry-attempts";
+  String UDF_DIRECTORY_FS = "drill.exec.udf.directory.fs";
+  String UDF_DIRECTORY_ROOT = "drill.exec.udf.directory.root";
+  String UDF_DIRECTORY_LOCAL = "drill.exec.udf.directory.local";
+  String UDF_DIRECTORY_STAGING = "drill.exec.udf.directory.staging";
+  String UDF_DIRECTORY_REGISTRY = "drill.exec.udf.directory.registry";
+  String UDF_DIRECTORY_TMP = "drill.exec.udf.directory.tmp";
 
-
-  String CLIENT_SUPPORT_COMPLEX_TYPES = "drill.client.supports-complex-types";
+  /**
+   * Local temporary directory is used as base for temporary storage of Dynamic UDF jars.
+   */
+  String DRILL_TMP_DIR = "drill.tmp-dir";
 
   String OUTPUT_FORMAT_OPTION = "store.format";
   OptionValidator OUTPUT_FORMAT_VALIDATOR = new StringValidator(OUTPUT_FORMAT_OPTION, "parquet");
@@ -296,15 +308,13 @@ public interface ExecConstants {
    * such as changing system options.
    */
   String ADMIN_USERS_KEY = "security.admin.users";
-  StringValidator ADMIN_USERS_VALIDATOR =
-      new AdminOptionValidator(ADMIN_USERS_KEY, ImpersonationUtil.getProcessUserName());
+  StringValidator ADMIN_USERS_VALIDATOR = new StringValidator(ADMIN_USERS_KEY, ImpersonationUtil.getProcessUserName(), true);
 
   /**
    * Option whose value is a comma separated list of admin usergroups.
    */
   String ADMIN_USER_GROUPS_KEY = "security.admin.user_groups";
-  StringValidator ADMIN_USER_GROUPS_VALIDATOR = new AdminOptionValidator(ADMIN_USER_GROUPS_KEY, "");
-
+  StringValidator ADMIN_USER_GROUPS_VALIDATOR = new StringValidator(ADMIN_USER_GROUPS_KEY, "", true);
   /**
    * Option whose value is a string representing list of inbound impersonation policies.
    *
@@ -337,4 +347,7 @@ public interface ExecConstants {
   String CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS = "prepare.statement.create_timeout_ms";
   OptionValidator CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS_VALIDATOR =
       new PositiveLongValidator(CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS, Integer.MAX_VALUE, 10000);
+
+  String DYNAMIC_UDF_SUPPORT_ENABLED = "exec.udf.enable_dynamic_support";
+  BooleanValidator DYNAMIC_UDF_SUPPORT_ENABLED_VALIDATOR = new BooleanValidator(DYNAMIC_UDF_SUPPORT_ENABLED, true, true);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java
index 94e03ad..f485e9e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java
@@ -88,16 +88,17 @@ public class ZkEphemeralStore<V> extends BaseTransientStore<V> {
 
   @Override
   public V putIfAbsent(final String key, final V value) {
-    final V old = get(key);
-    if (old == null) {
-      try {
-        final byte[] bytes = config.getSerializer().serialize(value);
-        getClient().put(key, bytes);
-      } catch (final IOException e) {
-        throw new DrillRuntimeException(String.format("unable to serialize value of type %s", value.getClass()), e);
+    try {
+      final InstanceSerializer<V> serializer = config.getSerializer();
+      final byte[] bytes = serializer.serialize(value);
+      final byte[] data = getClient().putIfAbsent(key, bytes);
+      if (data == null) {
+        return null;
       }
+      return serializer.deserialize(data);
+    } catch (final IOException e) {
+      throw new DrillRuntimeException(String.format("unable to serialize value of type %s", value.getClass()), e);
     }
-    return old;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
index 2debf43..610a2b9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
@@ -31,8 +31,12 @@ import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.drill.common.collections.ImmutableEntry;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.exception.VersionMismatchException;
+import org.apache.drill.exec.store.sys.store.DataChangeVersion;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.data.Stat;
 
 /**
  * A namespace aware Zookeeper client.
@@ -133,13 +137,52 @@ public class ZookeeperClient implements AutoCloseable {
    * the check is eventually consistent.
    *
    * @param path  target path
+   * @param consistent consistency flag
    */
   public byte[] get(final String path, final boolean consistent) {
+    return get(path, consistent, null);
+  }
+
+  /**
+   * Returns the value corresponding to the given key, null otherwise.
+   *
+   * The check is consistent as it is made against Zookeeper directly.
+   *
+   * Passes version holder to get data change version.
+   *
+   * @param path  target path
+   * @param version version holder
+   */
+  public byte[] get(final String path, DataChangeVersion version) {
+    return get(path, true, version);
+  }
+
+  /**
+   * Returns the value corresponding to the given key, null otherwise.
+   *
+   * If the flag consistent is set, the check is consistent as it is made against Zookeeper directly. Otherwise,
+   * the check is eventually consistent.
+   *
+   * If consistency flag is set to true and version holder is not null, passes version holder to get data change version.
+   * Data change version is retrieved from {@link Stat} object, it increases each time znode data change is performed.
+   * Link to Zookeeper documentation - https://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#sc_zkDataModel_znodes
+   *
+   * @param path  target path
+   * @param consistent consistency check
+   * @param version version holder
+   */
+  public byte[] get(final String path, final boolean consistent, final DataChangeVersion version) {
     Preconditions.checkNotNull(path, "path is required");
 
     final String target = PathUtils.join(root, path);
     if (consistent) {
       try {
+        if (version != null) {
+          Stat stat = new Stat();
+          final byte[] bytes = curator.getData().storingStatIn(stat).forPath(target);
+          version.setVersion(stat.getVersion());
+          return bytes;
+        }
         return curator.getData().forPath(target);
       } catch (final Exception ex) {
         throw new DrillRuntimeException(String.format("error retrieving value for [%s]", path), ex);
@@ -179,6 +222,26 @@ public class ZookeeperClient implements AutoCloseable {
    * @param data  data to store
    */
   public void put(final String path, final byte[] data) {
+    put(path, data, null);
+  }
+
+  /**
+   * Puts the given byte sequence into the given path.
+   *
+   * If path does not exists, this call creates it.
+   *
+   * If version holder is not null and path already exists, passes given version for comparison.
+   * Zookeeper maintains stat structure that holds version number which increases each time znode data change is performed.
+   * If we pass version that doesn't match the actual version of the data,
+   * the update will fail {@link org.apache.zookeeper.KeeperException.BadVersionException}.
+   * We catch such exception and re-throw it as {@link VersionMismatchException}.
+   * Link to documentation - https://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#sc_zkDataModel_znodes
+   *
+   * @param path  target path
+   * @param data  data to store
+   * @param version version holder
+   */
+  public void put(final String path, final byte[] data, DataChangeVersion version) {
     Preconditions.checkNotNull(path, "path is required");
     Preconditions.checkNotNull(data, "data is required");
 
@@ -199,9 +262,45 @@ public class ZookeeperClient implements AutoCloseable {
         }
       }
       if (hasNode) {
-        curator.setData().forPath(target, data);
+        if (version != null) {
+          try {
+            curator.setData().withVersion(version.getVersion()).forPath(target, data);
+          } catch (final KeeperException.BadVersionException e) {
+            throw new VersionMismatchException("Unable to put data. Version mismatch is detected.", version.getVersion(), e);
+          }
+        } else {
+          curator.setData().forPath(target, data);
+        }
       }
       getCache().rebuildNode(target);
+    } catch (final VersionMismatchException e) {
+      throw e;
+    } catch (final Exception e) {
+      throw new DrillRuntimeException("unable to put ", e);
+    }
+  }
+
+  /**
+   * Puts the given byte sequence into the given path if path is does not exist.
+   *
+   * @param path  target path
+   * @param data  data to store
+   * @return null if path was created, else data stored for the given path
+   */
+  public byte[] putIfAbsent(final String path, final byte[] data) {
+    Preconditions.checkNotNull(path, "path is required");
+    Preconditions.checkNotNull(data, "data is required");
+
+    final String target = PathUtils.join(root, path);
+    try {
+      try {
+        curator.create().withMode(mode).forPath(target, data);
+        getCache().rebuildNode(target);
+        return null;
+      } catch (NodeExistsException e) {
+        // do nothing
+      }
+      return curator.getData().forPath(target);
     } catch (final Exception e) {
       throw new DrillRuntimeException("unable to put ", e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java
new file mode 100644
index 0000000..0d59cc8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.exception;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+
+public class FunctionNotFoundException extends DrillRuntimeException {
+
+  public FunctionNotFoundException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionValidationException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionValidationException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionValidationException.java
new file mode 100644
index 0000000..7475e24
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionValidationException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.exception;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+
+public class FunctionValidationException extends DrillRuntimeException {
+
+  public FunctionValidationException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/exception/JarValidationException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/JarValidationException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/JarValidationException.java
new file mode 100644
index 0000000..a6fa407
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/JarValidationException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.exception;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+
+public class JarValidationException extends DrillRuntimeException {
+
+  public JarValidationException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/exception/VersionMismatchException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/VersionMismatchException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/VersionMismatchException.java
new file mode 100644
index 0000000..796f410
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/VersionMismatchException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.exception;
+
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+
+public class VersionMismatchException extends DrillRuntimeException {
+
+  public VersionMismatchException(String message, int expectedVersion, Throwable cause) {
+    super(message + ". Expected version : " + expectedVersion, cause);
+  }
+
+  public VersionMismatchException(String message, int expectedVersion) {
+    super(message + ". Expected version : " + expectedVersion);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
index 869a4ac..fc51d03 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
@@ -132,6 +132,34 @@ public abstract class DrillFuncHolder extends AbstractFuncHolder {
     return attributes.isDeterministic();
   }
 
+  /**
+   * Generates string representation of function input parameters:
+   * PARAMETER_TYPE_1-PARAMETER_MODE_1,PARAMETER_TYPE_2-PARAMETER_MODE_2
+   * Example: VARCHAR-REQUIRED,VARCHAR-OPTIONAL
+   * Returns empty string if function has no input parameters.
+   *
+   * @return string representation of function input parameters
+   */
+  public String getInputParameters() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("");
+    for (ValueReference ref : parameters) {
+      final MajorType type = ref.getType();
+      builder.append(",");
+      builder.append(type.getMinorType().toString());
+      builder.append("-");
+      builder.append(type.getMode().toString());
+    }
+    return builder.length() == 0 ? builder.toString() : builder.substring(1);
+  }
+
+  /**
+   * @return instance of class loader used to load function
+   */
+  public ClassLoader getClassLoader() {
+    return initializer.getClassLoader();
+  }
+
   protected JVar[] declareWorkspaceVariables(ClassGenerator<?> g) {
     JVar[] workspaceJVars = new JVar[workspaceVars.length];
     for (int i = 0; i < workspaceVars.length; i++) {

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFunctionRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFunctionRegistry.java
deleted file mode 100644
index f58d5a5..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFunctionRegistry.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.expr.fn;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.drill.common.scanner.persistence.AnnotatedClassDescriptor;
-import org.apache.drill.common.scanner.persistence.ScanResult;
-import org.apache.drill.exec.planner.logical.DrillConstExecutor;
-import org.apache.drill.exec.planner.sql.DrillOperatorTable;
-import org.apache.drill.exec.planner.sql.DrillSqlAggOperator;
-import org.apache.drill.exec.planner.sql.DrillSqlAggOperatorWithoutInference;
-import org.apache.drill.exec.planner.sql.DrillSqlOperator;
-
-import com.google.common.collect.ArrayListMultimap;
-import org.apache.drill.exec.planner.sql.DrillSqlOperatorWithoutInference;
-
-/**
- * Registry of Drill functions.
- */
-public class DrillFunctionRegistry {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFunctionRegistry.class);
-
-  // key: function name (lowercase) value: list of functions with that name
-  private final ArrayListMultimap<String, DrillFuncHolder> registeredFunctions = ArrayListMultimap.create();
-
-  private static final ImmutableMap<String, Pair<Integer, Integer>> registeredFuncNameToArgRange = ImmutableMap.<String, Pair<Integer, Integer>> builder()
-      // CONCAT is allowed to take [1, infinity) number of arguments.
-      // Currently, this flexibility is offered by DrillOptiq to rewrite it as
-      // a nested structure
-      .put("CONCAT", Pair.of(1, Integer.MAX_VALUE))
-
-      // When LENGTH is given two arguments, this function relies on DrillOptiq to rewrite it as
-      // another function based on the second argument (encodingType)
-      .put("LENGTH", Pair.of(1, 2))
-
-      // Dummy functions
-      .put("CONVERT_TO", Pair.of(2, 2))
-      .put("CONVERT_FROM", Pair.of(2, 2))
-      .put("FLATTEN", Pair.of(1, 1)).build();
-
-  public DrillFunctionRegistry(ScanResult classpathScan) {
-    FunctionConverter converter = new FunctionConverter();
-    List<AnnotatedClassDescriptor> providerClasses = classpathScan.getAnnotatedClasses();
-
-    // Hash map to prevent registering functions with exactly matching signatures
-    // key: Function Name + Input's Major Type
-    // value: Class name where function is implemented
-    //
-    final Map<String, String> functionSignatureMap = new HashMap<>();
-    for (AnnotatedClassDescriptor func : providerClasses) {
-      DrillFuncHolder holder = converter.getHolder(func);
-      if (holder != null) {
-        // register handle for each name the function can be referred to
-        String[] names = holder.getRegisteredNames();
-
-        // Create the string for input types
-        String functionInput = "";
-        for (DrillFuncHolder.ValueReference ref : holder.parameters) {
-          functionInput += ref.getType().toString();
-        }
-        for (String name : names) {
-          String functionName = name.toLowerCase();
-          registeredFunctions.put(functionName, holder);
-          String functionSignature = functionName + functionInput;
-          String existingImplementation;
-          if ((existingImplementation = functionSignatureMap.get(functionSignature)) != null) {
-            throw new AssertionError(
-                String.format(
-                    "Conflicting functions with similar signature found. Func Name: %s, Class name: %s " +
-                " Class name: %s", functionName, func.getClassName(), existingImplementation));
-          } else if (holder.isAggregating() && !holder.isDeterministic() ) {
-            logger.warn("Aggregate functions must be deterministic, did not register function {}", func.getClassName());
-          } else {
-            functionSignatureMap.put(functionSignature, func.getClassName());
-          }
-        }
-      } else {
-        logger.warn("Unable to initialize function for class {}", func.getClassName());
-      }
-    }
-    if (logger.isTraceEnabled()) {
-      StringBuilder allFunctions = new StringBuilder();
-      for (DrillFuncHolder method: registeredFunctions.values()) {
-        allFunctions.append(method.toString()).append("\n");
-      }
-      logger.trace("Registered functions: [\n{}]", allFunctions);
-    }
-  }
-
-  public int size(){
-    return registeredFunctions.size();
-  }
-
-  /** Returns functions with given name. Function name is case insensitive. */
-  public List<DrillFuncHolder> getMethods(String name) {
-    return this.registeredFunctions.get(name.toLowerCase());
-  }
-
-  public void register(DrillOperatorTable operatorTable) {
-    registerOperatorsWithInference(operatorTable);
-    registerOperatorsWithoutInference(operatorTable);
-  }
-
-  private void registerOperatorsWithInference(DrillOperatorTable operatorTable) {
-    final Map<String, DrillSqlOperator.DrillSqlOperatorBuilder> map = Maps.newHashMap();
-    final Map<String, DrillSqlAggOperator.DrillSqlAggOperatorBuilder> mapAgg = Maps.newHashMap();
-    for (Entry<String, Collection<DrillFuncHolder>> function : registeredFunctions.asMap().entrySet()) {
-      final ArrayListMultimap<Pair<Integer, Integer>, DrillFuncHolder> functions = ArrayListMultimap.create();
-      final ArrayListMultimap<Integer, DrillFuncHolder> aggregateFunctions = ArrayListMultimap.create();
-      final String name = function.getKey().toUpperCase();
-      boolean isDeterministic = true;
-      for (DrillFuncHolder func : function.getValue()) {
-        final int paramCount = func.getParamCount();
-        if(func.isAggregating()) {
-          aggregateFunctions.put(paramCount, func);
-        } else {
-          final Pair<Integer, Integer> argNumberRange;
-          if(registeredFuncNameToArgRange.containsKey(name)) {
-            argNumberRange = registeredFuncNameToArgRange.get(name);
-          } else {
-            argNumberRange = Pair.of(func.getParamCount(), func.getParamCount());
-          }
-          functions.put(argNumberRange, func);
-        }
-
-        if(!func.isDeterministic()) {
-          isDeterministic = false;
-        }
-      }
-      for (Entry<Pair<Integer, Integer>, Collection<DrillFuncHolder>> entry : functions.asMap().entrySet()) {
-        final Pair<Integer, Integer> range = entry.getKey();
-        final int max = range.getRight();
-        final int min = range.getLeft();
-        if(!map.containsKey(name)) {
-          map.put(name, new DrillSqlOperator.DrillSqlOperatorBuilder()
-              .setName(name));
-        }
-
-        final DrillSqlOperator.DrillSqlOperatorBuilder drillSqlOperatorBuilder = map.get(name);
-        drillSqlOperatorBuilder
-            .addFunctions(entry.getValue())
-            .setArgumentCount(min, max)
-            .setDeterministic(isDeterministic);
-      }
-      for (Entry<Integer, Collection<DrillFuncHolder>> entry : aggregateFunctions.asMap().entrySet()) {
-        if(!mapAgg.containsKey(name)) {
-          mapAgg.put(name, new DrillSqlAggOperator.DrillSqlAggOperatorBuilder().setName(name));
-        }
-
-        final DrillSqlAggOperator.DrillSqlAggOperatorBuilder drillSqlAggOperatorBuilder = mapAgg.get(name);
-        drillSqlAggOperatorBuilder
-            .addFunctions(entry.getValue())
-            .setArgumentCount(entry.getKey(), entry.getKey());
-      }
-    }
-
-    for(final Entry<String, DrillSqlOperator.DrillSqlOperatorBuilder> entry : map.entrySet()) {
-      operatorTable.addOperatorWithInference(
-          entry.getKey(),
-          entry.getValue().build());
-    }
-
-    for(final Entry<String, DrillSqlAggOperator.DrillSqlAggOperatorBuilder> entry : mapAgg.entrySet()) {
-      operatorTable.addOperatorWithInference(
-          entry.getKey(),
-          entry.getValue().build());
-    }
-  }
-
-  private void registerOperatorsWithoutInference(DrillOperatorTable operatorTable) {
-    SqlOperator op;
-    for (Entry<String, Collection<DrillFuncHolder>> function : registeredFunctions.asMap().entrySet()) {
-      Set<Integer> argCounts = Sets.newHashSet();
-      String name = function.getKey().toUpperCase();
-      for (DrillFuncHolder func : function.getValue()) {
-        if (argCounts.add(func.getParamCount())) {
-          if (func.isAggregating()) {
-            op = new DrillSqlAggOperatorWithoutInference(name, func.getParamCount());
-          } else {
-            boolean isDeterministic;
-            // prevent Drill from folding constant functions with types that cannot be materialized
-            // into literals
-            if (DrillConstExecutor.NON_REDUCIBLE_TYPES.contains(func.getReturnType().getMinorType())) {
-              isDeterministic = false;
-            } else {
-              isDeterministic = func.isDeterministic();
-            }
-            op = new DrillSqlOperatorWithoutInference(name, func.getParamCount(), func.getReturnType(), isDeterministic);
-          }
-          operatorTable.addOperatorWithoutInference(function.getKey(), op);
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java
index 78e4c62..655f571 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java
@@ -40,10 +40,14 @@ public class DrillSimpleFuncHolder extends DrillFuncHolder {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSimpleFuncHolder.class);
 
   private final String drillFuncClass;
+  // each function should be wrapped unique class loader associated with its jar
+  // to prevent classpath collisions during loading and unloading jars
+  private final ClassLoader classLoader;
 
   public DrillSimpleFuncHolder(FunctionAttributes functionAttributes, FunctionInitializer initializer) {
     super(functionAttributes, initializer);
     drillFuncClass = checkNotNull(initializer.getClassName());
+    classLoader = checkNotNull(initializer.getClassLoader());
   }
 
   private String setupBody() {
@@ -65,7 +69,7 @@ public class DrillSimpleFuncHolder extends DrillFuncHolder {
   }
 
   public DrillSimpleFunc createInterpreter() throws Exception {
-    return (DrillSimpleFunc)Class.forName(drillFuncClass).newInstance();
+    return (DrillSimpleFunc)Class.forName(drillFuncClass, true, classLoader).newInstance();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
index 00be7aa..2f606e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
@@ -50,7 +50,7 @@ import com.google.common.collect.Lists;
 public class FunctionConverter {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FunctionConverter.class);
 
-  public <T extends DrillFunc> DrillFuncHolder getHolder(AnnotatedClassDescriptor func) {
+  public <T extends DrillFunc> DrillFuncHolder getHolder(AnnotatedClassDescriptor func, ClassLoader classLoader) {
     FunctionTemplate template = func.getAnnotationProxy(FunctionTemplate.class);
     if (template == null) {
       return failure("Class does not declare FunctionTemplate annotation.", func);
@@ -173,7 +173,7 @@ public class FunctionConverter {
       return failure("This function declares zero output fields.  A function must declare one output field.", func);
     }
 
-    FunctionInitializer initializer = new FunctionInitializer(func.getClassName());
+    FunctionInitializer initializer = new FunctionInitializer(func.getClassName(), classLoader);
     try{
       // return holder
       ValueReference[] ps = params.toArray(new ValueReference[params.size()]);

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
index 5d26325..ede255a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
@@ -17,38 +17,69 @@
  */
 package org.apache.drill.exec.expr.fn;
 
+import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.net.URLConnection;
+import java.util.Enumeration;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+import com.typesafe.config.ConfigFactory;
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.common.config.CommonConstants;
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.fn.CastFunctions;
 import org.apache.drill.common.scanner.ClassPathScanner;
+import org.apache.drill.common.scanner.RunTimeScan;
 import org.apache.drill.common.scanner.persistence.ScanResult;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.coord.store.TransientStoreEvent;
+import org.apache.drill.exec.coord.store.TransientStoreListener;
+import org.apache.drill.exec.exception.FunctionValidationException;
+import org.apache.drill.exec.exception.JarValidationException;
+import org.apache.drill.exec.expr.fn.registry.LocalFunctionRegistry;
+import org.apache.drill.exec.expr.fn.registry.JarScan;
+import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry;
 import org.apache.drill.exec.planner.sql.DrillOperatorTable;
+import org.apache.drill.exec.proto.UserBitShared.Jar;
 import org.apache.drill.exec.resolver.FunctionResolver;
 import org.apache.drill.exec.server.options.OptionManager;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
+import org.apache.drill.exec.util.JarUtil;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 /**
  * This class offers the registry for functions. Notably, in addition to Drill its functions
- * (in {@link DrillFunctionRegistry}), other PluggableFunctionRegistry (e.g., {@link org.apache.drill.exec.expr.fn.HiveFunctionRegistry})
+ * (in {@link LocalFunctionRegistry}), other PluggableFunctionRegistry (e.g., {@link org.apache.drill.exec.expr.fn.HiveFunctionRegistry})
  * is also registered in this class
  */
-public class FunctionImplementationRegistry implements FunctionLookupContext {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FunctionImplementationRegistry.class);
+public class FunctionImplementationRegistry implements FunctionLookupContext, AutoCloseable {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FunctionImplementationRegistry.class);
 
-  private DrillFunctionRegistry drillFuncRegistry;
+  private final LocalFunctionRegistry localFunctionRegistry;
+  private final RemoteFunctionRegistry remoteFunctionRegistry;
+  private final Path localUdfDir;
+  private boolean deleteTmpDir = false;
+  private File tmpDir;
   private List<PluggableFunctionRegistry> pluggableFuncRegistries = Lists.newArrayList();
   private OptionManager optionManager = null;
 
@@ -61,7 +92,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext {
     Stopwatch w = Stopwatch.createStarted();
 
     logger.debug("Generating function registry.");
-    drillFuncRegistry = new DrillFunctionRegistry(classpathScan);
+    localFunctionRegistry = new LocalFunctionRegistry(classpathScan);
 
     Set<Class<? extends PluggableFunctionRegistry>> registryClasses =
         classpathScan.getImplementations(PluggableFunctionRegistry.class);
@@ -85,7 +116,9 @@ public class FunctionImplementationRegistry implements FunctionLookupContext {
         break;
       }
     }
-    logger.info("Function registry loaded.  {} functions loaded in {} ms.", drillFuncRegistry.size(), w.elapsed(TimeUnit.MILLISECONDS));
+    logger.info("Function registry loaded.  {} functions loaded in {} ms.", localFunctionRegistry.size(), w.elapsed(TimeUnit.MILLISECONDS));
+    this.remoteFunctionRegistry = new RemoteFunctionRegistry(new UnregistrationListener());
+    this.localUdfDir = getLocalUdfDir(config);
   }
 
   public FunctionImplementationRegistry(DrillConfig config, ScanResult classpathScan, OptionManager optionManager) {
@@ -99,7 +132,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext {
    */
   public void register(DrillOperatorTable operatorTable) {
     // Register Drill functions first and move to pluggable function registries.
-    drillFuncRegistry.register(operatorTable);
+    localFunctionRegistry.register(operatorTable);
 
     for(PluggableFunctionRegistry registry : pluggableFuncRegistries) {
       registry.register(operatorTable);
@@ -109,14 +142,26 @@ public class FunctionImplementationRegistry implements FunctionLookupContext {
   /**
    * Using the given <code>functionResolver</code> find Drill function implementation for given
    * <code>functionCall</code>
-   *
-   * @param functionResolver
-   * @param functionCall
-   * @return
+   * If function implementation was not found and in case if Dynamic UDF Support is enabled
+   * loads all missing remote functions and tries to find Drill implementation one more time.
    */
   @Override
   public DrillFuncHolder findDrillFunction(FunctionResolver functionResolver, FunctionCall functionCall) {
-    return functionResolver.getBestMatch(drillFuncRegistry.getMethods(functionReplacement(functionCall)), functionCall);
+    return findDrillFunction(functionResolver, functionCall, true);
+  }
+
+  private DrillFuncHolder findDrillFunction(FunctionResolver functionResolver, FunctionCall functionCall, boolean retry) {
+    AtomicLong version = new AtomicLong();
+    DrillFuncHolder holder = functionResolver.getBestMatch(
+        localFunctionRegistry.getMethods(functionReplacement(functionCall), version), functionCall);
+    if (holder == null && retry) {
+      if (optionManager != null && optionManager.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) {
+        if (loadRemoteFunctions(version.get())) {
+          findDrillFunction(functionResolver, functionCall, false);
+        }
+      }
+    }
+    return holder;
   }
 
   // Check if this Function Replacement is needed; if yes, return a new name. otherwise, return the original name
@@ -138,18 +183,26 @@ public class FunctionImplementationRegistry implements FunctionLookupContext {
 
   /**
    * Find the Drill function implementation that matches the name, arg types and return type.
-   * @param name
-   * @param argTypes
-   * @param returnType
-   * @return
+   * If exact function implementation was not found and in case if Dynamic UDF Support is enabled
+   * loads all missing remote functions and tries to find Drill implementation one more time.
    */
   public DrillFuncHolder findExactMatchingDrillFunction(String name, List<MajorType> argTypes, MajorType returnType) {
-    for (DrillFuncHolder h : drillFuncRegistry.getMethods(name)) {
+    return findExactMatchingDrillFunction(name, argTypes, returnType, true);
+  }
+
+  private DrillFuncHolder findExactMatchingDrillFunction(String name, List<MajorType> argTypes, MajorType returnType, boolean retry) {
+    AtomicLong version = new AtomicLong();
+    for (DrillFuncHolder h : localFunctionRegistry.getMethods(name, version)) {
       if (h.matches(returnType, argTypes)) {
         return h;
       }
     }
 
+    if (retry && optionManager != null && optionManager.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) {
+      if (loadRemoteFunctions(version.get())) {
+        findExactMatchingDrillFunction(name, argTypes, returnType, false);
+      }
+    }
     return null;
   }
 
@@ -177,7 +230,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext {
 
   // Method to find if the output type of a drill function if of complex type
   public boolean isFunctionComplexOutput(String name) {
-    List<DrillFuncHolder> methods = drillFuncRegistry.getMethods(name);
+    List<DrillFuncHolder> methods = localFunctionRegistry.getMethods(name);
     for (DrillFuncHolder holder : methods) {
       if (holder.getReturnValue().isComplexWriter()) {
         return true;
@@ -186,4 +239,257 @@ public class FunctionImplementationRegistry implements FunctionLookupContext {
     return false;
   }
 
+  public RemoteFunctionRegistry getRemoteFunctionRegistry() {
+    return remoteFunctionRegistry;
+  }
+
+  /**
+   * Using given local path to jar creates unique class loader for this jar.
+   * Class loader is closed to release opened connection to jar when validation is finished.
+   * Scan jar content to receive list of all scanned classes
+   * and starts validation process against local function registry.
+   * Checks if received list of validated function is not empty.
+   *
+   * @param path local path to jar we need to validate
+   * @return list of validated function signatures
+   */
+  public List<String> validate(Path path) throws IOException {
+    URL url = path.toUri().toURL();
+    URL[] urls = {url};
+    try (URLClassLoader classLoader = new URLClassLoader(urls)) {
+      ScanResult jarScanResult = scan(classLoader, path, urls);
+      List<String> functions = localFunctionRegistry.validate(path.getName(), jarScanResult);
+      if (functions.isEmpty()) {
+        throw new FunctionValidationException(String.format("Jar %s does not contain functions", path.getName()));
+      }
+      return functions;
+    }
+  }
+
+  /**
+   * Attempts to load and register functions from remote function registry.
+   * First checks if there is no missing jars.
+   * If yes, enters synchronized block to prevent other loading the same jars.
+   * Again re-checks if there are no missing jars in case someone has already loaded them (double-check lock).
+   * If there are still missing jars, first copies jars to local udf area and prepares {@link JarScan} for each jar.
+   * Jar registration timestamp represented in milliseconds is used as suffix.
+   * Then registers all jars at the same time. Returns true when finished.
+   * In case if any errors during jars coping or registration, logs errors and proceeds.
+   *
+   * If no missing jars are found, checks current local registry version.
+   * Returns false if versions match, true otherwise.
+   *
+   * @param version local function registry version
+   * @return true if new jars were registered or local function registry version is different, false otherwise
+   */
+  public boolean loadRemoteFunctions(long version) {
+    List<String> missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry);
+    if (!missingJars.isEmpty()) {
+      synchronized (this) {
+        missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry);
+        List<JarScan> jars = Lists.newArrayList();
+        for (String jarName : missingJars) {
+          Path binary = null;
+          Path source = null;
+          URLClassLoader classLoader = null;
+          try {
+            binary = copyJarToLocal(jarName, remoteFunctionRegistry);
+            source = copyJarToLocal(JarUtil.getSourceName(jarName), remoteFunctionRegistry);
+            URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()};
+            classLoader = new URLClassLoader(urls);
+            ScanResult scanResult = scan(classLoader, binary, urls);
+            localFunctionRegistry.validate(jarName, scanResult);
+            jars.add(new JarScan(jarName, scanResult, classLoader));
+          } catch (Exception e) {
+            deleteQuietlyLocalJar(binary);
+            deleteQuietlyLocalJar(source);
+            if (classLoader != null) {
+              try {
+                classLoader.close();
+              } catch (Exception ex) {
+                logger.warn("Problem during closing class loader for {}", jarName, e);
+              }
+            }
+            logger.error("Problem during remote functions load from {}", jarName, e);
+          }
+        }
+        if (!jars.isEmpty()) {
+          localFunctionRegistry.register(jars);
+          return true;
+        }
+      }
+    }
+    return version != localFunctionRegistry.getVersion();
+  }
+
+  /**
+   * First finds path to marker file url, otherwise throws {@link JarValidationException}.
+   * Then scans jar classes according to list indicated in marker files.
+   * Additional logic is added to close {@link URL} after {@link ConfigFactory#parseURL(URL)}.
+   * This is extremely important for Windows users where system doesn't allow to delete file if it's being used.
+   *
+   * @param classLoader unique class loader for jar
+   * @param path local path to jar
+   * @param urls urls associated with the jar (ex: binary and source)
+   * @return scan result of packages, classes, annotations found in jar
+   */
+  private ScanResult scan(ClassLoader classLoader, Path path, URL[] urls) throws IOException {
+    Enumeration<URL> markerFileEnumeration = classLoader.getResources(
+        CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME);
+    while (markerFileEnumeration.hasMoreElements()) {
+      URL markerFile = markerFileEnumeration.nextElement();
+      if (markerFile.getPath().contains(path.toUri().getPath())) {
+        URLConnection markerFileConnection = null;
+        try {
+          markerFileConnection = markerFile.openConnection();
+          DrillConfig drillConfig = DrillConfig.create(ConfigFactory.parseURL(markerFile));
+          return RunTimeScan.dynamicPackageScan(drillConfig, Sets.newHashSet(urls));
+        } finally {
+          if (markerFileConnection instanceof JarURLConnection) {
+            ((JarURLConnection) markerFile.openConnection()).getJarFile().close();
+          }
+        }
+      }
+    }
+    throw new JarValidationException(String.format("Marker file %s is missing in %s",
+        CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, path.getName()));
+  }
+
+  /**
+   * Return list of jars that are missing in local function registry
+   * but present in remote function registry.
+   *
+   * @param remoteFunctionRegistry remote function registry
+   * @param localFunctionRegistry local function registry
+   * @return list of missing jars
+   */
+  private List<String> getMissingJars(RemoteFunctionRegistry remoteFunctionRegistry,
+                                      LocalFunctionRegistry localFunctionRegistry) {
+    List<Jar> remoteJars = remoteFunctionRegistry.getRegistry().getJarList();
+    List<String> localJars = localFunctionRegistry.getAllJarNames();
+    List<String> missingJars = Lists.newArrayList();
+    for (Jar jar : remoteJars) {
+      if (!localJars.contains(jar.getName())) {
+        missingJars.add(jar.getName());
+      }
+    }
+    return missingJars;
+  }
+
+  /**
+   * Creates local udf directory, if it doesn't exist.
+   * Checks if local udf directory is a directory and if current application has write rights on it.
+   * Attempts to clean up local udf directory in case jars were left after previous drillbit run.
+   * Local udf directory path is concatenated from drill temporary directory and ${drill.exec.udf.directory.local}.
+   *
+   * @param config drill config
+   * @return path to local udf directory
+   */
+  private Path getLocalUdfDir(DrillConfig config) {
+    tmpDir = getTmpDir(config);
+    File udfDir = new File(tmpDir, config.getString(ExecConstants.UDF_DIRECTORY_LOCAL));
+    udfDir.mkdirs();
+    String udfPath = udfDir.getPath();
+    Preconditions.checkState(udfDir.exists(), "Local udf directory [%s] must exist", udfPath);
+    Preconditions.checkState(udfDir.isDirectory(), "Local udf directory [%s] must be a directory", udfPath);
+    Preconditions.checkState(udfDir.canWrite(), "Local udf directory [%s] must be writable for application user", udfPath);
+    try {
+      FileUtils.cleanDirectory(udfDir);
+    } catch (IOException e) {
+      throw new DrillRuntimeException("Error during local udf directory clean up", e);
+    }
+    return new Path(udfDir.toURI());
+  }
+
+  /**
+   * First tries to get drill temporary directory value from environmental variable $DRILL_TMP_DIR,
+   * then from config ${drill.tmp-dir}.
+   * If value is still missing, generates directory using {@link Files#createTempDir()}.
+   * If temporary directory was generated, sets {@link #deleteTmpDir} to true
+   * to delete directory on drillbit exit.
+   * @return drill temporary directory path
+   */
+  private File getTmpDir(DrillConfig config) {
+    String drillTempDir = System.getenv("DRILL_TMP_DIR");
+
+    if (drillTempDir == null && config.hasPath(ExecConstants.DRILL_TMP_DIR)) {
+      drillTempDir = config.getString(ExecConstants.DRILL_TMP_DIR);
+    }
+
+    if (drillTempDir == null) {
+      deleteTmpDir = true;
+      return Files.createTempDir();
+    }
+
+    return new File(drillTempDir);
+  }
+
+  /**
+   * Copies jar from remote udf area to local udf area.
+   *
+   * @param jarName jar name to be copied
+   * @param remoteFunctionRegistry remote function registry
+   * @return local path to jar that was copied
+   * @throws IOException in case of problems during jar coping process
+   */
+  private Path copyJarToLocal(String jarName, RemoteFunctionRegistry remoteFunctionRegistry) throws IOException {
+    Path registryArea = remoteFunctionRegistry.getRegistryArea();
+    FileSystem fs = remoteFunctionRegistry.getFs();
+    Path remoteJar = new Path(registryArea, jarName);
+    Path localJar = new Path(localUdfDir, jarName);
+    try {
+      fs.copyToLocalFile(remoteJar, localJar);
+    } catch (IOException e) {
+      String message = String.format("Error during jar [%s] coping from [%s] to [%s]",
+          jarName, registryArea.toUri().getPath(), localUdfDir.toUri().getPath());
+      throw new IOException(message, e);
+    }
+    return localJar;
+  }
+
+  /**
+   * Deletes quietly local jar but first checks if path to jar is not null.
+   *
+   * @param jar path to jar
+   */
+  private void deleteQuietlyLocalJar(Path jar) {
+    if (jar != null) {
+      FileUtils.deleteQuietly(new File(jar.toUri().getPath()));
+    }
+  }
+
+  /**
+   * If {@link #deleteTmpDir} is set to true, deletes generated temporary directory.
+   * Otherwise cleans up {@link #localUdfDir}.
+   */
+  @Override
+  public void close() {
+    if (deleteTmpDir) {
+      FileUtils.deleteQuietly(tmpDir);
+    } else {
+      try {
+        FileUtils.cleanDirectory(new File(localUdfDir.toUri().getPath()));
+      } catch (IOException e) {
+        logger.warn("Problems during local udf directory clean up", e);
+      }
+    }
+  }
+
+  /**
+   * Fires when jar name is submitted for unregistration.
+   * Will unregister all functions associated with the jar name
+   * and delete binary and source associated with the jar from local udf directory
+   */
+  public class UnregistrationListener implements TransientStoreListener {
+
+    @Override
+    public void onChange(TransientStoreEvent event) {
+      String jarName = (String) event.getValue();
+      localFunctionRegistry.unregister(jarName);
+      String localDir = localUdfDir.toUri().getPath();
+      FileUtils.deleteQuietly(new File(localDir, jarName));
+      FileUtils.deleteQuietly(new File(localDir, JarUtil.getSourceName(jarName)));
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionInitializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionInitializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionInitializer.java
index 1007afc..4e5ee4f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionInitializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionInitializer.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.expr.fn;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringReader;
-import java.net.URL;
 import java.util.List;
 import java.util.Map;
 
@@ -33,7 +32,6 @@ import org.codehaus.janino.Scanner;
 import org.mortbay.util.IO;
 
 import com.google.common.collect.Maps;
-import com.google.common.io.Resources;
 
 /**
  * To avoid the cost of initializing all functions up front,
@@ -42,8 +40,8 @@ import com.google.common.io.Resources;
 public class FunctionInitializer {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FunctionInitializer.class);
 
-  private String className;
-
+  private final String className;
+  private final ClassLoader classLoader;
   private Map<String, CompilationUnit> functionUnits = Maps.newHashMap();
   private Map<String, String> methods;
   private List<String> imports;
@@ -51,13 +49,21 @@ public class FunctionInitializer {
 
   /**
    * @param className the fully qualified name of the class implementing the function
+   * @param classLoader class loader associated with the function, is unique for each jar that holds function
+   *                    to prevent classpath collisions during loading an unloading jars
    */
-  public FunctionInitializer(String className) {
+  public FunctionInitializer(String className, ClassLoader classLoader) {
     super();
     this.className = className;
+    this.classLoader = classLoader;
   }
 
   /**
+   * @return returns class loader
+   */
+  public ClassLoader getClassLoader() { return classLoader; }
+
+  /**
    * @return the fully qualified name of the class implementing the function
    */
   public String getClassName() {
@@ -94,7 +100,7 @@ public class FunctionInitializer {
       // get function body.
 
       try {
-        final Class<?> clazz = Class.forName(className);
+        final Class<?> clazz = Class.forName(className, true, classLoader);
         final CompilationUnit cu = get(clazz);
 
         if (cu == null) {
@@ -123,8 +129,7 @@ public class FunctionInitializer {
       return cu;
     }
 
-    URL u = Resources.getResource(c, path);
-    try (InputStream is = Resources.asByteSource(u).openStream()) {
+    try (InputStream is = c.getResourceAsStream(path)) {
       if (is == null) {
         throw new IOException(String.format(
             "Failure trying to located source code for Class %s, tried to read on classpath location %s", c.getName(),

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionHolder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionHolder.java
new file mode 100644
index 0000000..4b93c88
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionHolder.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.fn.registry;
+
+import org.apache.drill.exec.expr.fn.DrillFuncHolder;
+
+/**
+ * Holder class that contains:
+ * <ol>
+ *   <li>function name</li>
+ *   <li>function signature which is string representation of function name and its input parameters</li>
+ *   <li>{@link DrillFuncHolder} associated with the function</li>
+ * </ol>
+ */
+public class FunctionHolder {
+
+  private final String name;
+  private final String signature;
+  private final DrillFuncHolder holder;
+
+  public FunctionHolder(String name, String signature, DrillFuncHolder holder) {
+    this.name = name;
+    this.signature = signature;
+    this.holder = holder;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public DrillFuncHolder getHolder() {
+    return holder;
+  }
+
+  public String getSignature() {
+    return signature;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
new file mode 100644
index 0000000..005c4e5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.fn.registry;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import org.apache.drill.common.concurrent.AutoCloseableLock;
+import org.apache.drill.exec.expr.fn.DrillFuncHolder;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Function registry holder stores function implementations by jar name, function name.
+ * Contains two maps that hold data by jars and functions respectively.
+ * Jars map contains each jar as a key and map of all its functions with collection of function signatures as value.
+ * Functions map contains function name as key and map of its signatures and function holder as value.
+ * All maps and collections used are concurrent to guarantee memory consistency effects.
+ * Such structure is chosen to achieve maximum speed while retrieving data by jar or by function name,
+ * since we expect infrequent registry changes.
+ * Holder is designed to allow concurrent reads and single writes to keep data consistent.
+ * This is achieved by {@link ReadWriteLock} implementation usage.
+ * Holder has number version which changes every time new jars are added or removed. Initial version number is 0.
+ * Also version is used when user needs data from registry with version it is based on.
+ *
+ * Structure example:
+ *
+ * JARS
+ * built-in   -> upper          -> upper(VARCHAR-REQUIRED)
+ *            -> lower          -> lower(VARCHAR-REQUIRED)
+ *
+ * First.jar  -> upper          -> upper(VARCHAR-OPTIONAL)
+ *            -> custom_upper   -> custom_upper(VARCHAR-REQUIRED)
+ *                              -> custom_upper(VARCHAR-OPTIONAL)
+ *
+ * Second.jar -> lower          -> lower(VARCHAR-OPTIONAL)
+ *            -> custom_upper   -> custom_upper(VARCHAR-REQUIRED)
+ *                              -> custom_upper(VARCHAR-OPTIONAL)
+ *
+ * FUNCTIONS
+ * upper        -> upper(VARCHAR-REQUIRED)        -> function holder for upper(VARCHAR-REQUIRED)
+ *              -> upper(VARCHAR-OPTIONAL)        -> function holder for upper(VARCHAR-OPTIONAL)
+ *
+ * lower        -> lower(VARCHAR-REQUIRED)        -> function holder for lower(VARCHAR-REQUIRED)
+ *              -> lower(VARCHAR-OPTIONAL)        -> function holder for lower(VARCHAR-OPTIONAL)
+ *
+ * custom_upper -> custom_upper(VARCHAR-REQUIRED) -> function holder for custom_upper(VARCHAR-REQUIRED)
+ *              -> custom_upper(VARCHAR-OPTIONAL) -> function holder for custom_upper(VARCHAR-OPTIONAL)
+ *
+ * custom_lower -> custom_lower(VARCHAR-REQUIRED) -> function holder for custom_lower(VARCHAR-REQUIRED)
+ *              -> custom_lower(VARCHAR-OPTIONAL) -> function holder for custom_lower(VARCHAR-OPTIONAL)
+ *
+ * where
+ * First.jar is jar name represented by String
+ * upper is function name represented by String
+ * upper(VARCHAR-REQUIRED) is signature name represented by String which consist of function name, list of input parameters
+ * function holder for upper(VARCHAR-REQUIRED) is {@link DrillFuncHolder} initiated for each function.
+ *
+ */
+public class FunctionRegistryHolder {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FunctionRegistryHolder.class);
+
+  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock());
+  private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock());
+  private long version = 0;
+
+  // jar name, Map<function name, Queue<function signature>
+  private final Map<String, Map<String, Queue<String>>> jars;
+
+  // function name, Map<function signature, function holder>
+  private final Map<String, Map<String, DrillFuncHolder>> functions;
+
+  public FunctionRegistryHolder() {
+    this.functions = Maps.newConcurrentMap();
+    this.jars = Maps.newConcurrentMap();
+  }
+
+  /**
+   * This is read operation, so several users at a time can get this data.
+   * @return local function registry version number
+   */
+  public long getVersion() {
+    try (AutoCloseableLock lock = readLock.open()) {
+      return version;
+    }
+  }
+
+  /**
+   * Adds jars to the function registry.
+   * If jar with the same name already exists, it and its functions will be removed.
+   * Then jar will be added to {@link #jars}
+   * and each function will be added using {@link #addFunctions(Map, List)}.
+   * Function version registry will be incremented by 1 if at least one jar was added but not for each jar.
+   * This is write operation, so one user at a time can call perform such action,
+   * others will wait till first user completes his action.
+   *
+   * @param newJars jars and list of their function holders, each contains function name, signature and holder
+   */
+  public void addJars(Map<String, List<FunctionHolder>> newJars) {
+    try (AutoCloseableLock lock = writeLock.open()) {
+      for (Map.Entry<String, List<FunctionHolder>> newJar : newJars.entrySet()) {
+        String jarName = newJar.getKey();
+        removeAllByJar(jarName);
+        Map<String, Queue<String>> jar = Maps.newConcurrentMap();
+        jars.put(jarName, jar);
+        addFunctions(jar, newJar.getValue());
+      }
+      if (!newJars.isEmpty()) {
+        version++;
+      }
+    }
+  }
+
+  /**
+   * Removes jar from {@link #jars} and all associated with jar functions from {@link #functions}
+   * If jar was removed, function registry version will be incremented by 1.
+   * This is write operation, so one user at a time can call perform such action,
+   * others will wait till first user completes his action.
+   *
+   * @param jarName jar name to be removed
+   */
+  public void removeJar(String jarName) {
+    try (AutoCloseableLock lock = writeLock.open()) {
+      if (removeAllByJar(jarName)) {
+        version++;
+      }
+    }
+  }
+
+  /**
+   * Retrieves list of all jars name present in {@link #jars}
+   * This is read operation, so several users can get this data.
+   *
+   * @return list of all jar names
+   */
+  public List<String> getAllJarNames() {
+    try (AutoCloseableLock lock = readLock.open()) {
+      return Lists.newArrayList(jars.keySet());
+    }
+  }
+
+  /**
+   * Retrieves all function names associated with the jar from {@link #jars}.
+   * Returns empty list if jar is not registered.
+   * This is read operation, so several users can perform this operation at the same time.
+   *
+   * @param jarName jar name
+   * @return list of functions names associated from the jar
+   */
+  public List<String> getFunctionNamesByJar(String jarName) {
+    try  (AutoCloseableLock lock = readLock.open()){
+      Map<String, Queue<String>> functions = jars.get(jarName);
+      return functions == null ? Lists.<String>newArrayList() : Lists.newArrayList(functions.keySet());
+    }
+  }
+
+  /**
+   * Returns list of functions with list of function holders for each functions.
+   * Uses guava {@link ListMultimap} structure to return data.
+   * If no functions present, will return empty {@link ListMultimap}.
+   * If version holder is not null, updates it with current registry version number.
+   * This is read operation, so several users can perform this operation at the same time.
+   *
+   * @param version version holder
+   * @return all functions which their holders
+   */
+  public ListMultimap<String, DrillFuncHolder> getAllFunctionsWithHolders(AtomicLong version) {
+    try (AutoCloseableLock lock = readLock.open()) {
+      if (version != null) {
+        version.set(this.version);
+      }
+      ListMultimap<String, DrillFuncHolder> functionsWithHolders = ArrayListMultimap.create();
+      for (Map.Entry<String, Map<String, DrillFuncHolder>> function : functions.entrySet()) {
+        functionsWithHolders.putAll(function.getKey(), Lists.newArrayList(function.getValue().values()));
+      }
+      return functionsWithHolders;
+    }
+  }
+
+  /**
+   * Returns list of functions with list of function holders for each functions without version number.
+   * This is read operation, so several users can perform this operation at the same time.
+   *
+   * @return all functions which their holders
+   */
+  public ListMultimap<String, DrillFuncHolder> getAllFunctionsWithHolders() {
+    return getAllFunctionsWithHolders(null);
+  }
+
+  /**
+   * Returns list of functions with list of function signatures for each functions.
+   * Uses guava {@link ListMultimap} structure to return data.
+   * If no functions present, will return empty {@link ListMultimap}.
+   * This is read operation, so several users can perform this operation at the same time.
+   *
+   * @return all functions which their signatures
+   */
+  public ListMultimap<String, String> getAllFunctionsWithSignatures() {
+    try (AutoCloseableLock lock = readLock.open()) {
+      ListMultimap<String, String> functionsWithSignatures = ArrayListMultimap.create();
+      for (Map.Entry<String, Map<String, DrillFuncHolder>> function : functions.entrySet()) {
+        functionsWithSignatures.putAll(function.getKey(), Lists.newArrayList(function.getValue().keySet()));
+      }
+      return functionsWithSignatures;
+    }
+  }
+
+  /**
+   * Returns all function holders associated with function name.
+   * If function is not present, will return empty list.
+   * If version holder is not null, updates it with current registry version number.
+   * This is read operation, so several users can perform this operation at the same time.
+   *
+   * @param functionName function name
+   * @param version version holder
+   * @return list of function holders
+   */
+  public List<DrillFuncHolder> getHoldersByFunctionName(String functionName, AtomicLong version) {
+    try (AutoCloseableLock lock = readLock.open()) {
+      if (version != null) {
+        version.set(this.version);
+      }
+      Map<String, DrillFuncHolder> holders = functions.get(functionName);
+      return holders == null ? Lists.<DrillFuncHolder>newArrayList() : Lists.newArrayList(holders.values());
+    }
+  }
+
+  /**
+   * Returns all function holders associated with function name without version number.
+   * This is read operation, so several users can perform this operation at the same time.
+   *
+   * @param functionName function name
+   * @return list of function holders
+   */
+  public List<DrillFuncHolder> getHoldersByFunctionName(String functionName) {
+    return getHoldersByFunctionName(functionName, null);
+  }
+
+  /**
+   * Checks is jar is present in {@link #jars}.
+   * This is read operation, so several users can perform this operation at the same time.
+   *
+   * @param jarName jar name
+   * @return true if jar exists, else false
+   */
+  public boolean containsJar(String jarName) {
+    try (AutoCloseableLock lock = readLock.open()) {
+      return jars.containsKey(jarName);
+    }
+  }
+
+  /**
+   * Returns quantity of functions stored in {@link #functions}.
+   * This is read operation, so several users can perform this operation at the same time.
+   *
+   * @return quantity of functions
+   */
+  public int functionsSize() {
+    try (AutoCloseableLock lock = readLock.open()) {
+      return functions.size();
+    }
+  }
+
+  /**
+   * Looks which jar in {@link #jars} contains passed function signature.
+   * First looks by function name and if found checks if such function has passed function signature.
+   * Returns jar name if found matching function signature, else null.
+   * This is read operation, so several users can perform this operation at the same time.
+   *
+   * @param functionName function name
+   * @param functionSignature function signature
+   * @return jar name
+   */
+  public String getJarNameByFunctionSignature(String functionName, String functionSignature) {
+    try (AutoCloseableLock lock = readLock.open()) {
+      for (Map.Entry<String, Map<String, Queue<String>>> jar : jars.entrySet()) {
+        Queue<String> functionSignatures = jar.getValue().get(functionName);
+        if (functionSignatures != null && functionSignatures.contains(functionSignature)) {
+          return jar.getKey();
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Adds all function names and signatures to passed jar,
+   * adds all function names, their signatures and holders to {@link #functions}.
+   *
+   * @param jar jar where function to be added
+   * @param newFunctions collection of function holders, each contains function name, signature and holder.
+   */
+  private void addFunctions(Map<String, Queue<String>> jar, List<FunctionHolder> newFunctions) {
+    for (FunctionHolder function : newFunctions) {
+      final String functionName = function.getName();
+      Queue<String> jarFunctions = jar.get(functionName);
+      if (jarFunctions == null) {
+        jarFunctions = Queues.newConcurrentLinkedQueue();;
+        jar.put(functionName, jarFunctions);
+      }
+      final String functionSignature = function.getSignature();
+      jarFunctions.add(functionSignature);
+
+      Map<String, DrillFuncHolder> signatures = functions.get(functionName);
+      if (signatures == null) {
+        signatures = Maps.newConcurrentMap();
+        functions.put(functionName, signatures);
+      }
+      signatures.put(functionSignature, function.getHolder());
+    }
+  }
+
+  /**
+   * Removes jar from {@link #jars} and all associated with jars functions from {@link #functions}
+   * Since each jar is loaded with separate class loader before
+   * removing we need to close class loader to release opened connection to jar.
+   * All jar functions have the same class loader, so we need to close only one time.
+   *
+   * @param jarName jar name to be removed
+   * @return true if jar was removed, false otherwise
+   */
+  private boolean removeAllByJar(String jarName) {
+    Map<String, Queue<String>> jar = jars.remove(jarName);
+    if (jar == null) {
+      return false;
+    }
+
+    for (Map.Entry<String, Queue<String>> functionEntry : jar.entrySet()) {
+      final String function = functionEntry.getKey();
+      Map<String, DrillFuncHolder> functionHolders = functions.get(function);
+      Queue<String> functionSignatures = functionEntry.getValue();
+      for (Map.Entry<String, DrillFuncHolder> entry : functionHolders.entrySet()) {
+        if (functionSignatures.contains(entry.getKey())) {
+          ClassLoader classLoader = entry.getValue().getClassLoader();
+          if (classLoader instanceof AutoCloseable) {
+            try {
+              ((AutoCloseable) classLoader).close();
+            } catch (Exception e) {
+              logger.warn("Problem during closing class loader", e);
+            }
+          }
+          break;
+        }
+      }
+      functionHolders.keySet().removeAll(functionSignatures);
+
+      if (functionHolders.isEmpty()) {
+        functions.remove(function);
+      }
+    }
+    return true;
+  }
+}