You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by arina-ielchiieva <gi...@git.apache.org> on 2016/12/21 14:44:02 UTC

[GitHub] drill pull request #701: DRILL-4963: Sync remote and local function registri...

GitHub user arina-ielchiieva opened a pull request:

    https://github.com/apache/drill/pull/701

    DRILL-4963: Sync remote and local function registries before query ex\u2026

    \u2026ecution
    
    Lazy-init was performed only when function was not found during Calcite parsing but DRILL-4963 shows different cases when Calcite parsing can pass (usually during function overloading) but still function is not found. To handle such cases, we need to sync remote and local function registries before query execution. To make this sync as much light-weight as possible we first compare remote and local function registries versions and start looking for missing jars only when versions do not match. Under local function registry is implied remote function registry version with which local function registry was synchronized last time.
    
    Changes:
    1. Add `consists` method to PersistentStore interface which can return true if key exists in store, false otherwise. This method is needed to return only remote function registry version without its content (unlike method `get`). We'll pull remote function registry content only if versions are different.
    2. Added check if remote and local function registries are in sync before query execution on planning and execution stages.
    3. Removed unused methods and changes connected with lazy-init implementation on failure only.
    4. Added additional debug messages for `CreateFunctionHandler` and `DropFunctionHandler`.
    5. Updated unit tests to reflect new changes.
    
    
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/arina-ielchiieva/drill DRILL-4963

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/drill/pull/701.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #701
    
----
commit 51ef6614a2c27cb6bb58fb0de875952f99e9b102
Author: Arina Ielchiieva <ar...@gmail.com>
Date:   2016-12-20T16:57:15Z

    DRILL-4963: Sync remote and local function registries before query execution

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill issue #701: DRILL-4963: Fix issues with dynamically loaded overloaded ...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on the issue:

    https://github.com/apache/drill/pull/701
  
    @jinfengni 
    
    1. Depending on how often udfs are added, we don't expect it to happen often though. But you are correct about the overhead for the queries that do not use dynamic UDFs.
    2. You are right, function registry can be checked several times and can slow down the entire query, It's hard to say how much performance will be slow down, as it may depend on many factors, like number of parallel queries, number of not exact functions in query, ZK time of response and so on).
    3. Refresh function registry function is considered but as part of MVCC. It could help in current approach but still it could not guarantee that after issuing the refresh command all drillbits will sync their local function registries with remote one, unless refresh function would wait for all drillbits to send their confirmation that sync was done. But what if one of drillbits fails to sync, should refresh function have retry mechanism or fail immediately, how long it could take the user to wait for refresh command to finish execution etc. With MVCC refresh command would need to guarantee that only current drillbit is in sync and all above questions will be dropped (more in MVCC doc).
    
    Anyway, you are totally right that current approach is covering only the gap with function overloading and not optimal and may slow down the queries. Having refresh command might partially solve the problem as well but might have some issues to be covered. So it's better to dive in MVCC for the most optimal implementation. 
    
    Regarding this pull request I don't have strong feelings if it should be merged or not. Yes, it would solve problem with functions overloading but it may impact performance but it's hard to say how much since many factors may have influence.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill issue #701: DRILL-4963: Fix issues with dynamically loaded overloaded ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on the issue:

    https://github.com/apache/drill/pull/701
  
    As I understand it, the issues are these:
    
    * When parsing/planning a query, function references are ambiguous.
    * If a function x() simply does not exist at all, we get a clear signal and can check the function registry for updates.
    * When names are overridden, there may be many x(arg1) functions: some defined locally, some recently added to the registry.
    * In this case, we get no clear signal that we should check the registry since we might find a good-enough match locally and not know to check for a better match in the registry.
    
    The solution is to check the registry version on each new query. This must be done for every query (with functions) since we can never be certain whether an override exists in the registry.
    
    The problem is that a check of ZK, even to get a version, adds latency. 99.9% of the time, nothing will have changed. How can we fix that?
    
    Let's take a step back. The original problems we tried to solve were:
    
    If a user executes a CREATE FUNCTION x on Drillbit A and the command returns successfully, then:
    
    * If the same user immediately executes a query using x on the same Drillbit, that query succeeds.
    * If some other user executes a query using x (on any Drillbit, say Drillbit B), then the query either fails (x is not found) or succeeds (x is found on Drillbit B and on all Drillbits that run fragments.)
    
    In general, the above requires lots of synchronization: we'd want every Drillbit to synchronize with the registry before every query parse and fragment execution. We know that is expensive. So, we looked for special case opportunities to exploit. The "found/not found" semantics above appeared to provide that special case. What that trick off the table, we are back to the massive synchronization solution, which is costly.
    
    We want to keep the semantics listed above, but without the cost of synchronization.
    Just tossing out ideas (I'm not (yet) proposing changes), perhaps:
    
    * Each Drillbit maintains a cache of the most recent ZK registry version it has seen.
    * When a Foreman registers a function, it updates the ZK registry and its locally-cached version number.
    * When a Foreman runs a query, it includes the registry version number in the physical plan sent to each Drillbit for execution.
    * The Drillbit checks the plan's version number against its cached version. If the plan version is newer, the Drillbit downloads the latest registry data and updates its cached version number.
    
    The above ensures that, if a function is registered on Drillbit A, then any query that is submitted to A will execute anywhere on the cluster. So far so good.
    
    But, what about queries submitted to Drillbits B, C and D? How do they learn about the new functions? Here, perhaps we can use an eventually consistent pattern. The other Drillbits listen for ZK change notifications and refresh their local registries from ZK, and update the locally cached version number.
    
    Now, we get the semantics that if a function is defined on Drillbit A, a brief time later it will be available on Drillbits B, C and D. Once available, the above rules apply: the registry version is written into the plan and all other fragment executors will force update their local registries if they haven't yet gotten the ZK update notices.
    
    The only drawback is a slight delay in a function becoming available to the cluster. But, that delay is fine in an eventually consistent design. Said another way, we make no transaction guarantees that updating something on Drillbit A will be immediately reflected on B, etc.
    
    This is a rough draft. It may be what the code does. I'll continue to review and revise the note as I learn more.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill issue #701: DRILL-4963: Fix issues with dynamically loaded overloaded ...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on the issue:

    https://github.com/apache/drill/pull/701
  
    @paul-rogers as we discussed to have tried to find the way to preserve lazy-init approach.
    I have renamed PR to reflect latest changes. Please find new solution and description of changes below:
     
    Lazy-init was performed only when function was not found during Calcite parsing but DRILL-4963 shows different cases when Calcite parsing can pass (usually during function overloading) but still function is not found. To handle such cases, we need to enhance lazy-init process:
    1. Lazy-init process should be more light-weight. Currently when function is not found, we load all jars from remote function registry and compare with jars from local function registry. It's not optimal especially when both registries are in sync. To improve performance I have introduced remove function registry version which can be used to check if we need to sync remote and local registries prior to checking jars and functions.
    2. During parsing stage we were only catching Calcite parsing exception but function not found can be also indicated by Drill function error and so on. Not to be engaged into enumerating possible exceptions (which can added in the code later), we are checking if remote and local function registries are in sync on any error. Such check may only affect on queries that will fail anyway. So if failure time will take a little bit longer, it won't make significant difference.
    3. During execution stage Drill attempts to find matching function from the list of functions with the same names. `DefaultFunctionResolver.getBestMatch()` does not do exact match, it may return function with different input parameters types. Best match is found according to rules described in `TypeCastRules.class`. Currently we attempt to sync remote and local function registries only if best matching function was found but it is not correct since even if Drill finds the best matching function among current functions but does not mean that remote function registry does not hold even better matching function. To fix this issue we would first try to find function using `ExactFunctionResolver.getBestMatch()` and if exactly matching function is not found, we'll check if remote and local function registries are in sync and then use `DefaultFunctionResolver.getBestMatch()` to find the best matching function. But if exactly matching function is found, we'll return it right away without
  any registries sync checks.
    
    Changes:
    1. Add `consists` method to PersistentStore interface which can return true if key exists in store, false otherwise. This method is needed to return only remote function registry version without its content (unlike method `get`). We'll pull remote function registry content only if versions are different.
    2. Added check if remote and local function registries are in sync on any failure during planning stage
    and on exact matching function not found during execution stage.
    3. Added additional debug messages for `CreateFunctionHandler` and `DropFunctionHandler`.
    4. Updated unit tests to reflect new changes.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill issue #701: DRILL-4963: Fix issues with dynamically loaded overloaded ...

Posted by jinfengni <gi...@git.apache.org>.
Github user jinfengni commented on the issue:

    https://github.com/apache/drill/pull/701
  
    +1
    
    I have one naive question. Before we dive into MVCC, or try to improve the current ZK-based approach, have we consider adding a command "refresh function registry", so that users enforce that function registry across the entire cluster is consistent? I remember someone (probably @amansinha100 ) once brought up this idea.
    
    1. How often would user run into the issue caused by inconsistency of functions in different drillbit?  Sounds like we are adding non-negligible overhead for other queries which do not use dynamic UDF or overloaded functions, just for the sake of resolving issues of function inconsistency.   
    2. Drill's function resolution logic not only happens in planning but also execution time (due to the fact schema may be known in execution time). This means the function registry could be checked multiple times during the query lifetime.  Adding overhead to each check would slow down the entire query.
    3. With the "refresh function registry" command, use has a way to ensure function registry is consistent after issuing such command. We could tell user that drillbit could run into various of problems, before run "refresh function registry", therefore it's always recommend to run such command after user creates a new UDF.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #701: DRILL-4963: Fix issues with dynamically loaded over...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/701#discussion_r99454038
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java ---
    @@ -140,27 +142,39 @@ public void register(DrillOperatorTable operatorTable) {
       }
     
       /**
    -   * Using the given <code>functionResolver</code>
    -   * finds Drill function implementation for given <code>functionCall</code>.
    -   * If function implementation was not found,
    -   * loads all missing remote functions and tries to find Drill implementation one more time.
    +   * First attempts to finds the Drill function implementation that matches the name, arg types and return type.
    +   * If exact function implementation was not found,
    +   * syncs local function registry with remote function registry if needed
    +   * and tries to find function implementation one more time
    --- End diff --
    
    While this sounds pretty good, consider a possible condition. Suppose a user consistently uses an overloaded method. Every one of those queries will need to check with ZK. Drill is supposed to handle many concurrent queries. Each of those will trigger the update. Soon, we'll be pounding on ZK hundreds of times per second.
    
    The "not found" case was fine to force a sync since a user would not, presumably, continually issue such queries if the function really were undefined. But, the overloaded function case is possible, and can lead to performance issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #701: DRILL-4963: Fix issues with dynamically loaded over...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/701#discussion_r102920945
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java ---
    @@ -50,13 +47,56 @@
       private DrillSqlWorker() {
       }
     
    +  /**
    +   * Converts sql query string into query physical plan.
    +   *
    +   * @param context query context
    +   * @param sql sql query
    +   * @return query physical plan
    +   */
       public static PhysicalPlan getPlan(QueryContext context, String sql) throws SqlParseException, ValidationException,
           ForemanSetupException {
         return getPlan(context, sql, null);
       }
     
    +  /**
    +   * Converts sql query string into query physical plan.
    +   * In case of any errors (that might occur due to missing function implementation),
    +   * checks if local function registry should be synchronized with remote function registry.
    +   * If sync took place, reloads drill operator table
    +   * (since functions were added to / removed from local function registry)
    +   * and attempts to converts sql query string into query physical plan one more time.
    +   *
    +   * @param context query context
    +   * @param sql sql query
    +   * @param textPlan text plan
    +   * @return query physical plan
    +   */
       public static PhysicalPlan getPlan(QueryContext context, String sql, Pointer<String> textPlan)
           throws ForemanSetupException {
    +    Pointer<String> textPlanCopy = textPlan == null ? null : new Pointer<>(textPlan.value);
    +    try {
    +      return getQueryPlan(context, sql, textPlan);
    +    } catch (Exception e) {
    --- End diff --
    
    Not to be engaged into enumerating possible exceptions (which can added in the code later), we are checking if remote and local function registries are in sync on any error. Such check may only affect on queries that will fail anyway. So if failure time will take a little bit longer, it won't make significant difference. But this will be removed when MVCC (multi-version concurrency control) will be implemented: when query will fail, we won't check if our function registry is up-to-date, since MVCC will ensure we have before query startup.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #701: DRILL-4963: Fix issues with dynamically loaded over...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/701#discussion_r102919942
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java ---
    @@ -260,76 +293,101 @@ public RemoteFunctionRegistry getRemoteFunctionRegistry() {
       }
     
       /**
    -   * Attempts to load and register functions from remote function registry.
    -   * First checks if there is no missing jars.
    -   * If yes, enters synchronized block to prevent other loading the same jars.
    -   * Again re-checks if there are no missing jars in case someone has already loaded them (double-check lock).
    -   * If there are still missing jars, first copies jars to local udf area and prepares {@link JarScan} for each jar.
    -   * Jar registration timestamp represented in milliseconds is used as suffix.
    -   * Then registers all jars at the same time. Returns true when finished.
    -   * In case if any errors during jars coping or registration, logs errors and proceeds.
    +   * Purpose of this method is to synchronize remote and local function registries if needed
    +   * and to inform if function registry was changed after given version.
        *
    -   * If no missing jars are found, checks current local registry version.
    -   * Returns false if versions match, true otherwise.
    +   * To make synchronization as much light-weigh as possible, first only versions of both registries are checked
    +   * without any locking. If synchronization is needed, enters synchronized block to prevent others loading the same jars.
    +   * The need of synchronization is checked again (double-check lock) before comparing jars.
    +   * If any missing jars are found, they are downloaded to local udf area, each is wrapped into {@link JarScan}.
    +   * Once jar download is finished, all missing jars are registered in one batch.
    +   * In case if any errors during jars download / registration, these errors are logged.
        *
    -   * @param version local function registry version
    -   * @return true if new jars were registered or local function registry version is different, false otherwise
    +   * During registration local function registry is updated with remote function registry version it is synced with.
    +   * When at least one jar of the missing jars failed to download / register,
    +   * local function registry version are not updated but jars that where successfully downloaded / registered
    +   * are added to local function registry.
    +   *
    +   * If synchronization between remote and local function registry was not needed,
    +   * checks if given registry version matches latest sync version
    +   * to inform if function registry was changed after given version.
    +   *
    +   * @param version remote function registry local function registry was based on
    +   * @return true if remote and local function registries were synchronized after given version
        */
    -  public boolean loadRemoteFunctions(long version) {
    -    List<String> missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry);
    -    if (!missingJars.isEmpty()) {
    +  public boolean syncWithRemoteRegistry(long version) {
    +    if (doSyncFunctionRegistries(remoteFunctionRegistry.getRegistryVersion(), localFunctionRegistry.getVersion())) {
           synchronized (this) {
    -        missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry);
    -        if (!missingJars.isEmpty()) {
    -          logger.info("Starting dynamic UDFs lazy-init process.\n" +
    -              "The following jars are going to be downloaded and registered locally: " + missingJars);
    +        long localRegistryVersion = localFunctionRegistry.getVersion();
    +        if (doSyncFunctionRegistries(remoteFunctionRegistry.getRegistryVersion(), localRegistryVersion))  {
    +          DataChangeVersion remoteVersion = new DataChangeVersion();
    +          List<String> missingJars = getMissingJars(this.remoteFunctionRegistry, localFunctionRegistry, remoteVersion);
               List<JarScan> jars = Lists.newArrayList();
    -          for (String jarName : missingJars) {
    -            Path binary = null;
    -            Path source = null;
    -            URLClassLoader classLoader = null;
    -            try {
    -              binary = copyJarToLocal(jarName, remoteFunctionRegistry);
    -              source = copyJarToLocal(JarUtil.getSourceName(jarName), remoteFunctionRegistry);
    -              URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()};
    -              classLoader = new URLClassLoader(urls);
    -              ScanResult scanResult = scan(classLoader, binary, urls);
    -              localFunctionRegistry.validate(jarName, scanResult);
    -              jars.add(new JarScan(jarName, scanResult, classLoader));
    -            } catch (Exception e) {
    -              deleteQuietlyLocalJar(binary);
    -              deleteQuietlyLocalJar(source);
    -              if (classLoader != null) {
    -                try {
    -                  classLoader.close();
    -                } catch (Exception ex) {
    -                  logger.warn("Problem during closing class loader for {}", jarName, e);
    +          if (!missingJars.isEmpty()) {
    +            logger.info("Starting dynamic UDFs lazy-init process.\n" +
    +                "The following jars are going to be downloaded and registered locally: " + missingJars);
    +            for (String jarName : missingJars) {
    +              Path binary = null;
    +              Path source = null;
    +              URLClassLoader classLoader = null;
    +              try {
    +                binary = copyJarToLocal(jarName, this.remoteFunctionRegistry);
    +                source = copyJarToLocal(JarUtil.getSourceName(jarName), this.remoteFunctionRegistry);
    +                URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()};
    +                classLoader = new URLClassLoader(urls);
    +                ScanResult scanResult = scan(classLoader, binary, urls);
    +                localFunctionRegistry.validate(jarName, scanResult);
    +                jars.add(new JarScan(jarName, scanResult, classLoader));
    +              } catch (Exception e) {
    +                deleteQuietlyLocalJar(binary);
    +                deleteQuietlyLocalJar(source);
    +                if (classLoader != null) {
    +                  try {
    +                    classLoader.close();
    +                  } catch (Exception ex) {
    +                    logger.warn("Problem during closing class loader for {}", jarName, e);
    +                  }
                     }
    +                logger.error("Problem during remote functions load from {}", jarName, e);
                   }
    -              logger.error("Problem during remote functions load from {}", jarName, e);
                 }
               }
    -          if (!jars.isEmpty()) {
    -            localFunctionRegistry.register(jars);
    -            return true;
    -          }
    +          long latestRegistryVersion = jars.size() != missingJars.size() ?
    +              localRegistryVersion : remoteVersion.getVersion();
    +          localFunctionRegistry.register(jars, latestRegistryVersion);
    +          return true;
             }
           }
         }
    +
         return version != localFunctionRegistry.getVersion();
       }
     
       /**
    -   * First finds path to marker file url, otherwise throws {@link JarValidationException}.
    -   * Then scans jar classes according to list indicated in marker files.
    -   * Additional logic is added to close {@link URL} after {@link ConfigFactory#parseURL(URL)}.
    -   * This is extremely important for Windows users where system doesn't allow to delete file if it's being used.
    +   * Checks if local function registry should be synchronized with remote function registry.
    +   * If remote function registry version is -1, it means that remote function registry is unreachable
    +   * or is not configured thus we skip synchronization and return false.
    +   * In all other cases synchronization is needed if remote and local function registries versions do not match.
    --- End diff --
    
    At drillbit startup if remote function registry doesn't exist, we put empty remote function registry in zookeeper with version 0. So we can guarantee that any new dynamically added UDF will registry version will start with at least 1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #701: DRILL-4963: Fix issues with dynamically loaded over...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/701#discussion_r99649785
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java ---
    @@ -260,76 +293,101 @@ public RemoteFunctionRegistry getRemoteFunctionRegistry() {
       }
     
       /**
    -   * Attempts to load and register functions from remote function registry.
    -   * First checks if there is no missing jars.
    -   * If yes, enters synchronized block to prevent other loading the same jars.
    -   * Again re-checks if there are no missing jars in case someone has already loaded them (double-check lock).
    -   * If there are still missing jars, first copies jars to local udf area and prepares {@link JarScan} for each jar.
    -   * Jar registration timestamp represented in milliseconds is used as suffix.
    -   * Then registers all jars at the same time. Returns true when finished.
    -   * In case if any errors during jars coping or registration, logs errors and proceeds.
    +   * Purpose of this method is to synchronize remote and local function registries if needed
    +   * and to inform if function registry was changed after given version.
        *
    -   * If no missing jars are found, checks current local registry version.
    -   * Returns false if versions match, true otherwise.
    +   * To make synchronization as much light-weigh as possible, first only versions of both registries are checked
    +   * without any locking. If synchronization is needed, enters synchronized block to prevent others loading the same jars.
    +   * The need of synchronization is checked again (double-check lock) before comparing jars.
    +   * If any missing jars are found, they are downloaded to local udf area, each is wrapped into {@link JarScan}.
    +   * Once jar download is finished, all missing jars are registered in one batch.
    +   * In case if any errors during jars download / registration, these errors are logged.
        *
    -   * @param version local function registry version
    -   * @return true if new jars were registered or local function registry version is different, false otherwise
    +   * During registration local function registry is updated with remote function registry version it is synced with.
    +   * When at least one jar of the missing jars failed to download / register,
    +   * local function registry version are not updated but jars that where successfully downloaded / registered
    +   * are added to local function registry.
    +   *
    +   * If synchronization between remote and local function registry was not needed,
    +   * checks if given registry version matches latest sync version
    +   * to inform if function registry was changed after given version.
    +   *
    +   * @param version remote function registry local function registry was based on
    +   * @return true if remote and local function registries were synchronized after given version
        */
    -  public boolean loadRemoteFunctions(long version) {
    -    List<String> missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry);
    -    if (!missingJars.isEmpty()) {
    +  public boolean syncWithRemoteRegistry(long version) {
    +    if (doSyncFunctionRegistries(remoteFunctionRegistry.getRegistryVersion(), localFunctionRegistry.getVersion())) {
           synchronized (this) {
    -        missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry);
    -        if (!missingJars.isEmpty()) {
    -          logger.info("Starting dynamic UDFs lazy-init process.\n" +
    -              "The following jars are going to be downloaded and registered locally: " + missingJars);
    +        long localRegistryVersion = localFunctionRegistry.getVersion();
    +        if (doSyncFunctionRegistries(remoteFunctionRegistry.getRegistryVersion(), localRegistryVersion))  {
    +          DataChangeVersion remoteVersion = new DataChangeVersion();
    +          List<String> missingJars = getMissingJars(this.remoteFunctionRegistry, localFunctionRegistry, remoteVersion);
               List<JarScan> jars = Lists.newArrayList();
    -          for (String jarName : missingJars) {
    -            Path binary = null;
    -            Path source = null;
    -            URLClassLoader classLoader = null;
    -            try {
    -              binary = copyJarToLocal(jarName, remoteFunctionRegistry);
    -              source = copyJarToLocal(JarUtil.getSourceName(jarName), remoteFunctionRegistry);
    -              URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()};
    -              classLoader = new URLClassLoader(urls);
    -              ScanResult scanResult = scan(classLoader, binary, urls);
    -              localFunctionRegistry.validate(jarName, scanResult);
    -              jars.add(new JarScan(jarName, scanResult, classLoader));
    -            } catch (Exception e) {
    -              deleteQuietlyLocalJar(binary);
    -              deleteQuietlyLocalJar(source);
    -              if (classLoader != null) {
    -                try {
    -                  classLoader.close();
    -                } catch (Exception ex) {
    -                  logger.warn("Problem during closing class loader for {}", jarName, e);
    +          if (!missingJars.isEmpty()) {
    +            logger.info("Starting dynamic UDFs lazy-init process.\n" +
    +                "The following jars are going to be downloaded and registered locally: " + missingJars);
    +            for (String jarName : missingJars) {
    +              Path binary = null;
    +              Path source = null;
    +              URLClassLoader classLoader = null;
    +              try {
    +                binary = copyJarToLocal(jarName, this.remoteFunctionRegistry);
    +                source = copyJarToLocal(JarUtil.getSourceName(jarName), this.remoteFunctionRegistry);
    +                URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()};
    +                classLoader = new URLClassLoader(urls);
    +                ScanResult scanResult = scan(classLoader, binary, urls);
    +                localFunctionRegistry.validate(jarName, scanResult);
    +                jars.add(new JarScan(jarName, scanResult, classLoader));
    +              } catch (Exception e) {
    +                deleteQuietlyLocalJar(binary);
    +                deleteQuietlyLocalJar(source);
    +                if (classLoader != null) {
    +                  try {
    +                    classLoader.close();
    +                  } catch (Exception ex) {
    +                    logger.warn("Problem during closing class loader for {}", jarName, e);
    +                  }
                     }
    +                logger.error("Problem during remote functions load from {}", jarName, e);
                   }
    -              logger.error("Problem during remote functions load from {}", jarName, e);
                 }
               }
    -          if (!jars.isEmpty()) {
    -            localFunctionRegistry.register(jars);
    -            return true;
    -          }
    +          long latestRegistryVersion = jars.size() != missingJars.size() ?
    +              localRegistryVersion : remoteVersion.getVersion();
    +          localFunctionRegistry.register(jars, latestRegistryVersion);
    +          return true;
             }
           }
         }
    +
         return version != localFunctionRegistry.getVersion();
       }
     
       /**
    -   * First finds path to marker file url, otherwise throws {@link JarValidationException}.
    -   * Then scans jar classes according to list indicated in marker files.
    -   * Additional logic is added to close {@link URL} after {@link ConfigFactory#parseURL(URL)}.
    -   * This is extremely important for Windows users where system doesn't allow to delete file if it's being used.
    +   * Checks if local function registry should be synchronized with remote function registry.
    +   * If remote function registry version is -1, it means that remote function registry is unreachable
    +   * or is not configured thus we skip synchronization and return false.
    +   * In all other cases synchronization is needed if remote and local function registries versions do not match.
        *
    -   * @param classLoader unique class loader for jar
    -   * @param path local path to jar
    -   * @param urls urls associated with the jar (ex: binary and source)
    -   * @return scan result of packages, classes, annotations found in jar
    +   * @param remoteVersion remote function registry version
    +   * @param localVersion local function registry version
    +   * @return true is local registry should be refreshed, false otherwise
        */
    +  private boolean doSyncFunctionRegistries(long remoteVersion, long localVersion) {
    --- End diff --
    
    Nit: This is a query, so maybe `isRegistrySyncNeeded`? The "do" prefix is often used as the implementation, so I expected this to be the implementation of the registry sync, given the name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #701: DRILL-4963: Fix issues with dynamically loaded over...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/701#discussion_r99454180
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java ---
    @@ -178,22 +192,41 @@ private String functionReplacement(FunctionCall functionCall) {
       }
     
       /**
    -   * Find the Drill function implementation that matches the name, arg types and return type.
    -   * If exact function implementation was not found,
    -   * loads all missing remote functions and tries to find Drill implementation one more time.
    +   * Finds the Drill function implementation that matches the name, arg types and return type.
    +   *
    +   * @param name function name
    +   * @param argTypes input parameters types
    +   * @param returnType function return type
    +   * @return exactly matching function holder
    --- End diff --
    
    Thanks for adding the Javadoc! Very helpful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill issue #701: DRILL-4963: Fix issues with dynamically loaded overloaded ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on the issue:

    https://github.com/apache/drill/pull/701
  
    Thank you for the very clear explanation. It is now obvious that the original lazy init design relied on the "found/not found" semantics of a simple name match: no ambiguity there. However, there is ambiguity with the overload rules. Clearly, at planning stage, we need to check that our entire list of candidate functions is consistent with ZK. Your design for that seems quite good.
    
    I wonder, at execution time, do we save a fully qualified function name so we can go back to the "found/not found" trick? That is, do we save the function as "foo( INT, VARCHAR )" or do we repeat the type inference at runtime?
    
    I will do a detailed review of the code next.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #701: DRILL-4963: Fix issues with dynamically loaded over...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/701#discussion_r99454556
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java ---
    @@ -260,76 +293,101 @@ public RemoteFunctionRegistry getRemoteFunctionRegistry() {
       }
     
       /**
    -   * Attempts to load and register functions from remote function registry.
    -   * First checks if there is no missing jars.
    -   * If yes, enters synchronized block to prevent other loading the same jars.
    -   * Again re-checks if there are no missing jars in case someone has already loaded them (double-check lock).
    -   * If there are still missing jars, first copies jars to local udf area and prepares {@link JarScan} for each jar.
    -   * Jar registration timestamp represented in milliseconds is used as suffix.
    -   * Then registers all jars at the same time. Returns true when finished.
    -   * In case if any errors during jars coping or registration, logs errors and proceeds.
    +   * Purpose of this method is to synchronize remote and local function registries if needed
    +   * and to inform if function registry was changed after given version.
        *
    -   * If no missing jars are found, checks current local registry version.
    -   * Returns false if versions match, true otherwise.
    +   * To make synchronization as much light-weigh as possible, first only versions of both registries are checked
    +   * without any locking. If synchronization is needed, enters synchronized block to prevent others loading the same jars.
    +   * The need of synchronization is checked again (double-check lock) before comparing jars.
    +   * If any missing jars are found, they are downloaded to local udf area, each is wrapped into {@link JarScan}.
    +   * Once jar download is finished, all missing jars are registered in one batch.
    +   * In case if any errors during jars download / registration, these errors are logged.
        *
    -   * @param version local function registry version
    -   * @return true if new jars were registered or local function registry version is different, false otherwise
    +   * During registration local function registry is updated with remote function registry version it is synced with.
    +   * When at least one jar of the missing jars failed to download / register,
    +   * local function registry version are not updated but jars that where successfully downloaded / registered
    +   * are added to local function registry.
    +   *
    +   * If synchronization between remote and local function registry was not needed,
    +   * checks if given registry version matches latest sync version
    +   * to inform if function registry was changed after given version.
    +   *
    +   * @param version remote function registry local function registry was based on
    +   * @return true if remote and local function registries were synchronized after given version
        */
    -  public boolean loadRemoteFunctions(long version) {
    -    List<String> missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry);
    -    if (!missingJars.isEmpty()) {
    +  public boolean syncWithRemoteRegistry(long version) {
    +    if (doSyncFunctionRegistries(remoteFunctionRegistry.getRegistryVersion(), localFunctionRegistry.getVersion())) {
           synchronized (this) {
    -        missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry);
    -        if (!missingJars.isEmpty()) {
    -          logger.info("Starting dynamic UDFs lazy-init process.\n" +
    -              "The following jars are going to be downloaded and registered locally: " + missingJars);
    +        long localRegistryVersion = localFunctionRegistry.getVersion();
    +        if (doSyncFunctionRegistries(remoteFunctionRegistry.getRegistryVersion(), localRegistryVersion))  {
    +          DataChangeVersion remoteVersion = new DataChangeVersion();
    +          List<String> missingJars = getMissingJars(this.remoteFunctionRegistry, localFunctionRegistry, remoteVersion);
               List<JarScan> jars = Lists.newArrayList();
    -          for (String jarName : missingJars) {
    -            Path binary = null;
    -            Path source = null;
    -            URLClassLoader classLoader = null;
    -            try {
    -              binary = copyJarToLocal(jarName, remoteFunctionRegistry);
    -              source = copyJarToLocal(JarUtil.getSourceName(jarName), remoteFunctionRegistry);
    -              URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()};
    -              classLoader = new URLClassLoader(urls);
    -              ScanResult scanResult = scan(classLoader, binary, urls);
    -              localFunctionRegistry.validate(jarName, scanResult);
    -              jars.add(new JarScan(jarName, scanResult, classLoader));
    -            } catch (Exception e) {
    -              deleteQuietlyLocalJar(binary);
    -              deleteQuietlyLocalJar(source);
    -              if (classLoader != null) {
    -                try {
    -                  classLoader.close();
    -                } catch (Exception ex) {
    -                  logger.warn("Problem during closing class loader for {}", jarName, e);
    +          if (!missingJars.isEmpty()) {
    +            logger.info("Starting dynamic UDFs lazy-init process.\n" +
    +                "The following jars are going to be downloaded and registered locally: " + missingJars);
    +            for (String jarName : missingJars) {
    +              Path binary = null;
    +              Path source = null;
    +              URLClassLoader classLoader = null;
    +              try {
    +                binary = copyJarToLocal(jarName, this.remoteFunctionRegistry);
    +                source = copyJarToLocal(JarUtil.getSourceName(jarName), this.remoteFunctionRegistry);
    +                URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()};
    +                classLoader = new URLClassLoader(urls);
    +                ScanResult scanResult = scan(classLoader, binary, urls);
    +                localFunctionRegistry.validate(jarName, scanResult);
    +                jars.add(new JarScan(jarName, scanResult, classLoader));
    +              } catch (Exception e) {
    +                deleteQuietlyLocalJar(binary);
    --- End diff --
    
    Race condition?
    
    Suppose I have a query:
    ```
    SELECT fn1(a), fn2(b) FROM ...
    ```
    
    `fn1` is an existing Dynamic UDF (DUDF). `fn2()` is overloaded. We do the above check for `fn2`. In that check, we learn that `fn1` has been deleted. At this point, we have passed the checks for that function, will proceed to execute the query. `fn1` will then fail.
    
    Also, consider a different query:
    
    ```
    SELECT fn2(b), fn3(b) FROM ...
    ```
    
    Now, both functions are overloaded. Both will trigger the remote version check.
    
    In both cases, we have a non-atomic check; the query can be processed using two different versions of the remote registry.
    
    What we need to do is either:
    
    1. If we need to do a check, restart all function checks (or at least those that referred to a DUDF.)
    2. Do the version check once at the start of the query parse, and stick with that version for the entire query.
    
    Neither option is good. Option 1 causes the race conditions outlined here. Option 2 is slow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #701: DRILL-4963: Fix issues with dynamically loaded over...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/701#discussion_r102921207
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java ---
    @@ -140,27 +142,39 @@ public void register(DrillOperatorTable operatorTable) {
       }
     
       /**
    -   * Using the given <code>functionResolver</code>
    -   * finds Drill function implementation for given <code>functionCall</code>.
    -   * If function implementation was not found,
    -   * loads all missing remote functions and tries to find Drill implementation one more time.
    +   * First attempts to finds the Drill function implementation that matches the name, arg types and return type.
    +   * If exact function implementation was not found,
    +   * syncs local function registry with remote function registry if needed
    +   * and tries to find function implementation one more time
    --- End diff --
    
    As agreed, we'll gone leave this implementation till MVCC will be implemented. MVCC will ensure we have up-to-date function registry before query planning or execution stages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #701: DRILL-4963: Fix issues with dynamically loaded over...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/701#discussion_r99650496
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java ---
    @@ -50,13 +47,56 @@
       private DrillSqlWorker() {
       }
     
    +  /**
    +   * Converts sql query string into query physical plan.
    +   *
    +   * @param context query context
    +   * @param sql sql query
    +   * @return query physical plan
    +   */
       public static PhysicalPlan getPlan(QueryContext context, String sql) throws SqlParseException, ValidationException,
           ForemanSetupException {
         return getPlan(context, sql, null);
       }
     
    +  /**
    +   * Converts sql query string into query physical plan.
    +   * In case of any errors (that might occur due to missing function implementation),
    +   * checks if local function registry should be synchronized with remote function registry.
    +   * If sync took place, reloads drill operator table
    +   * (since functions were added to / removed from local function registry)
    +   * and attempts to converts sql query string into query physical plan one more time.
    +   *
    +   * @param context query context
    +   * @param sql sql query
    +   * @param textPlan text plan
    +   * @return query physical plan
    +   */
       public static PhysicalPlan getPlan(QueryContext context, String sql, Pointer<String> textPlan)
           throws ForemanSetupException {
    +    Pointer<String> textPlanCopy = textPlan == null ? null : new Pointer<>(textPlan.value);
    +    try {
    +      return getQueryPlan(context, sql, textPlan);
    +    } catch (Exception e) {
    --- End diff --
    
    Should we be more specific in the error we catch? Wouldn't this mean that, even for a simple syntax error, we'd resync and retry? Can we catch only the specific function error of interest?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #701: DRILL-4963: Fix issues with dynamically loaded over...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/drill/pull/701


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #701: DRILL-4963: Fix issues with dynamically loaded over...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/701#discussion_r102921712
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java ---
    @@ -260,76 +293,101 @@ public RemoteFunctionRegistry getRemoteFunctionRegistry() {
       }
     
       /**
    -   * Attempts to load and register functions from remote function registry.
    -   * First checks if there is no missing jars.
    -   * If yes, enters synchronized block to prevent other loading the same jars.
    -   * Again re-checks if there are no missing jars in case someone has already loaded them (double-check lock).
    -   * If there are still missing jars, first copies jars to local udf area and prepares {@link JarScan} for each jar.
    -   * Jar registration timestamp represented in milliseconds is used as suffix.
    -   * Then registers all jars at the same time. Returns true when finished.
    -   * In case if any errors during jars coping or registration, logs errors and proceeds.
    +   * Purpose of this method is to synchronize remote and local function registries if needed
    +   * and to inform if function registry was changed after given version.
        *
    -   * If no missing jars are found, checks current local registry version.
    -   * Returns false if versions match, true otherwise.
    +   * To make synchronization as much light-weigh as possible, first only versions of both registries are checked
    +   * without any locking. If synchronization is needed, enters synchronized block to prevent others loading the same jars.
    +   * The need of synchronization is checked again (double-check lock) before comparing jars.
    +   * If any missing jars are found, they are downloaded to local udf area, each is wrapped into {@link JarScan}.
    +   * Once jar download is finished, all missing jars are registered in one batch.
    +   * In case if any errors during jars download / registration, these errors are logged.
        *
    -   * @param version local function registry version
    -   * @return true if new jars were registered or local function registry version is different, false otherwise
    +   * During registration local function registry is updated with remote function registry version it is synced with.
    +   * When at least one jar of the missing jars failed to download / register,
    +   * local function registry version are not updated but jars that where successfully downloaded / registered
    +   * are added to local function registry.
    +   *
    +   * If synchronization between remote and local function registry was not needed,
    +   * checks if given registry version matches latest sync version
    +   * to inform if function registry was changed after given version.
    +   *
    +   * @param version remote function registry local function registry was based on
    +   * @return true if remote and local function registries were synchronized after given version
        */
    -  public boolean loadRemoteFunctions(long version) {
    -    List<String> missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry);
    -    if (!missingJars.isEmpty()) {
    +  public boolean syncWithRemoteRegistry(long version) {
    +    if (doSyncFunctionRegistries(remoteFunctionRegistry.getRegistryVersion(), localFunctionRegistry.getVersion())) {
           synchronized (this) {
    -        missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry);
    -        if (!missingJars.isEmpty()) {
    -          logger.info("Starting dynamic UDFs lazy-init process.\n" +
    -              "The following jars are going to be downloaded and registered locally: " + missingJars);
    +        long localRegistryVersion = localFunctionRegistry.getVersion();
    +        if (doSyncFunctionRegistries(remoteFunctionRegistry.getRegistryVersion(), localRegistryVersion))  {
    +          DataChangeVersion remoteVersion = new DataChangeVersion();
    +          List<String> missingJars = getMissingJars(this.remoteFunctionRegistry, localFunctionRegistry, remoteVersion);
               List<JarScan> jars = Lists.newArrayList();
    -          for (String jarName : missingJars) {
    -            Path binary = null;
    -            Path source = null;
    -            URLClassLoader classLoader = null;
    -            try {
    -              binary = copyJarToLocal(jarName, remoteFunctionRegistry);
    -              source = copyJarToLocal(JarUtil.getSourceName(jarName), remoteFunctionRegistry);
    -              URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()};
    -              classLoader = new URLClassLoader(urls);
    -              ScanResult scanResult = scan(classLoader, binary, urls);
    -              localFunctionRegistry.validate(jarName, scanResult);
    -              jars.add(new JarScan(jarName, scanResult, classLoader));
    -            } catch (Exception e) {
    -              deleteQuietlyLocalJar(binary);
    -              deleteQuietlyLocalJar(source);
    -              if (classLoader != null) {
    -                try {
    -                  classLoader.close();
    -                } catch (Exception ex) {
    -                  logger.warn("Problem during closing class loader for {}", jarName, e);
    +          if (!missingJars.isEmpty()) {
    +            logger.info("Starting dynamic UDFs lazy-init process.\n" +
    +                "The following jars are going to be downloaded and registered locally: " + missingJars);
    +            for (String jarName : missingJars) {
    +              Path binary = null;
    +              Path source = null;
    +              URLClassLoader classLoader = null;
    +              try {
    +                binary = copyJarToLocal(jarName, this.remoteFunctionRegistry);
    +                source = copyJarToLocal(JarUtil.getSourceName(jarName), this.remoteFunctionRegistry);
    +                URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()};
    +                classLoader = new URLClassLoader(urls);
    +                ScanResult scanResult = scan(classLoader, binary, urls);
    +                localFunctionRegistry.validate(jarName, scanResult);
    +                jars.add(new JarScan(jarName, scanResult, classLoader));
    +              } catch (Exception e) {
    +                deleteQuietlyLocalJar(binary);
    --- End diff --
    
    Makes sense. As agreed we'll leave current implementation as is till MVCC will be implemented. MVCC will ensure that query will be run across the same function registry version across different drillbits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #701: DRILL-4963: Fix issues with dynamically loaded over...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/701#discussion_r99649541
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java ---
    @@ -260,76 +293,101 @@ public RemoteFunctionRegistry getRemoteFunctionRegistry() {
       }
     
       /**
    -   * Attempts to load and register functions from remote function registry.
    -   * First checks if there is no missing jars.
    -   * If yes, enters synchronized block to prevent other loading the same jars.
    -   * Again re-checks if there are no missing jars in case someone has already loaded them (double-check lock).
    -   * If there are still missing jars, first copies jars to local udf area and prepares {@link JarScan} for each jar.
    -   * Jar registration timestamp represented in milliseconds is used as suffix.
    -   * Then registers all jars at the same time. Returns true when finished.
    -   * In case if any errors during jars coping or registration, logs errors and proceeds.
    +   * Purpose of this method is to synchronize remote and local function registries if needed
    +   * and to inform if function registry was changed after given version.
        *
    -   * If no missing jars are found, checks current local registry version.
    -   * Returns false if versions match, true otherwise.
    +   * To make synchronization as much light-weigh as possible, first only versions of both registries are checked
    +   * without any locking. If synchronization is needed, enters synchronized block to prevent others loading the same jars.
    +   * The need of synchronization is checked again (double-check lock) before comparing jars.
    +   * If any missing jars are found, they are downloaded to local udf area, each is wrapped into {@link JarScan}.
    +   * Once jar download is finished, all missing jars are registered in one batch.
    +   * In case if any errors during jars download / registration, these errors are logged.
        *
    -   * @param version local function registry version
    -   * @return true if new jars were registered or local function registry version is different, false otherwise
    +   * During registration local function registry is updated with remote function registry version it is synced with.
    +   * When at least one jar of the missing jars failed to download / register,
    +   * local function registry version are not updated but jars that where successfully downloaded / registered
    +   * are added to local function registry.
    +   *
    +   * If synchronization between remote and local function registry was not needed,
    +   * checks if given registry version matches latest sync version
    +   * to inform if function registry was changed after given version.
    +   *
    +   * @param version remote function registry local function registry was based on
    +   * @return true if remote and local function registries were synchronized after given version
        */
    -  public boolean loadRemoteFunctions(long version) {
    -    List<String> missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry);
    -    if (!missingJars.isEmpty()) {
    +  public boolean syncWithRemoteRegistry(long version) {
    +    if (doSyncFunctionRegistries(remoteFunctionRegistry.getRegistryVersion(), localFunctionRegistry.getVersion())) {
           synchronized (this) {
    -        missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry);
    -        if (!missingJars.isEmpty()) {
    -          logger.info("Starting dynamic UDFs lazy-init process.\n" +
    -              "The following jars are going to be downloaded and registered locally: " + missingJars);
    +        long localRegistryVersion = localFunctionRegistry.getVersion();
    +        if (doSyncFunctionRegistries(remoteFunctionRegistry.getRegistryVersion(), localRegistryVersion))  {
    +          DataChangeVersion remoteVersion = new DataChangeVersion();
    +          List<String> missingJars = getMissingJars(this.remoteFunctionRegistry, localFunctionRegistry, remoteVersion);
               List<JarScan> jars = Lists.newArrayList();
    -          for (String jarName : missingJars) {
    -            Path binary = null;
    -            Path source = null;
    -            URLClassLoader classLoader = null;
    -            try {
    -              binary = copyJarToLocal(jarName, remoteFunctionRegistry);
    -              source = copyJarToLocal(JarUtil.getSourceName(jarName), remoteFunctionRegistry);
    -              URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()};
    -              classLoader = new URLClassLoader(urls);
    -              ScanResult scanResult = scan(classLoader, binary, urls);
    -              localFunctionRegistry.validate(jarName, scanResult);
    -              jars.add(new JarScan(jarName, scanResult, classLoader));
    -            } catch (Exception e) {
    -              deleteQuietlyLocalJar(binary);
    -              deleteQuietlyLocalJar(source);
    -              if (classLoader != null) {
    -                try {
    -                  classLoader.close();
    -                } catch (Exception ex) {
    -                  logger.warn("Problem during closing class loader for {}", jarName, e);
    +          if (!missingJars.isEmpty()) {
    +            logger.info("Starting dynamic UDFs lazy-init process.\n" +
    +                "The following jars are going to be downloaded and registered locally: " + missingJars);
    +            for (String jarName : missingJars) {
    +              Path binary = null;
    +              Path source = null;
    +              URLClassLoader classLoader = null;
    +              try {
    +                binary = copyJarToLocal(jarName, this.remoteFunctionRegistry);
    +                source = copyJarToLocal(JarUtil.getSourceName(jarName), this.remoteFunctionRegistry);
    +                URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()};
    +                classLoader = new URLClassLoader(urls);
    +                ScanResult scanResult = scan(classLoader, binary, urls);
    +                localFunctionRegistry.validate(jarName, scanResult);
    +                jars.add(new JarScan(jarName, scanResult, classLoader));
    +              } catch (Exception e) {
    +                deleteQuietlyLocalJar(binary);
    +                deleteQuietlyLocalJar(source);
    +                if (classLoader != null) {
    +                  try {
    +                    classLoader.close();
    +                  } catch (Exception ex) {
    +                    logger.warn("Problem during closing class loader for {}", jarName, e);
    +                  }
                     }
    +                logger.error("Problem during remote functions load from {}", jarName, e);
                   }
    -              logger.error("Problem during remote functions load from {}", jarName, e);
                 }
               }
    -          if (!jars.isEmpty()) {
    -            localFunctionRegistry.register(jars);
    -            return true;
    -          }
    +          long latestRegistryVersion = jars.size() != missingJars.size() ?
    +              localRegistryVersion : remoteVersion.getVersion();
    +          localFunctionRegistry.register(jars, latestRegistryVersion);
    +          return true;
             }
           }
         }
    +
         return version != localFunctionRegistry.getVersion();
       }
     
       /**
    -   * First finds path to marker file url, otherwise throws {@link JarValidationException}.
    -   * Then scans jar classes according to list indicated in marker files.
    -   * Additional logic is added to close {@link URL} after {@link ConfigFactory#parseURL(URL)}.
    -   * This is extremely important for Windows users where system doesn't allow to delete file if it's being used.
    +   * Checks if local function registry should be synchronized with remote function registry.
    +   * If remote function registry version is -1, it means that remote function registry is unreachable
    +   * or is not configured thus we skip synchronization and return false.
    +   * In all other cases synchronization is needed if remote and local function registries versions do not match.
    --- End diff --
    
    What is the initial condition? When I first start Drill with a new install, local version will default to 0. Will the remote version also default to 0? Can there be a case where the remote version is 0 and still contains functions that we need to download? Or, can we be guaranteed that a new ZK registry always has version 1 (not 0)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill issue #701: DRILL-4963: Sync remote and local function registries befo...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on the issue:

    https://github.com/apache/drill/pull/701
  
    General comment. Wasn't the design supposed to be that the lazy lookup would check for the function both in planning ("Calcite") and execution times? The whole point of the lazy init was to avoid per-query syncs (which will be prohibitively expensive for short queries) and to avoid race conditions.
    
    I wonder, is the problem the *way* in which we are checking for functions? The issue in this particular JIRA is a function with the same name as an existing function, but with different signature. Did the original lazy init check only names? Could a fix be to check name & signature?
    
    In general, the question is, can lazy init be preserved, but enhanced to handle the case described in the bug? It seems a shame to move away from it after all the effort that went into getting it (almost) right...
    
    If we switch to relying on syncing, we've got to rethink all the race conditions that were meant to be resolved with lazy init. Seems we'd have to update the design doc. to match. I'd really like to see if we can provide a fix that preserves the lazy init design so we keep the performance & concurrency benefits that it provides.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #701: DRILL-4963: Fix issues with dynamically loaded over...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/701#discussion_r99650129
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java ---
    @@ -50,13 +47,56 @@
       private DrillSqlWorker() {
       }
     
    +  /**
    +   * Converts sql query string into query physical plan.
    +   *
    +   * @param context query context
    +   * @param sql sql query
    +   * @return query physical plan
    +   */
       public static PhysicalPlan getPlan(QueryContext context, String sql) throws SqlParseException, ValidationException,
           ForemanSetupException {
         return getPlan(context, sql, null);
       }
     
    +  /**
    +   * Converts sql query string into query physical plan.
    +   * In case of any errors (that might occur due to missing function implementation),
    +   * checks if local function registry should be synchronized with remote function registry.
    +   * If sync took place, reloads drill operator table
    +   * (since functions were added to / removed from local function registry)
    +   * and attempts to converts sql query string into query physical plan one more time.
    +   *
    +   * @param context query context
    +   * @param sql sql query
    +   * @param textPlan text plan
    +   * @return query physical plan
    +   */
    --- End diff --
    
    Thanks for adding the Javadoc!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill issue #701: DRILL-4963: Fix issues with dynamically loaded overloaded ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on the issue:

    https://github.com/apache/drill/pull/701
  
    It seems that the concept of overloading, is, itself, ambiguous. If I define a function `foo(long)`, but call it with an `int`, we won't get an exact match, will we? So, on every call we'd have to check if there is a new, better, match for `foo()` in the registry. This means a call to ZK for every function in every query where we don't have an exact parameter match. My suspicion is that this will be a performance issue, but we won't know until someone tests it.
    
    I wonder if we should do this fix incrementally. This PR is better than the original, as it does handle overloads. After that, we can do a bit of performance testing to see the impact of checking ZK for the version on every overloaded method. Any performance improvement can be done as separate JIRA and PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill issue #701: DRILL-4963: Fix issues with dynamically loaded overloaded ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on the issue:

    https://github.com/apache/drill/pull/701
  
    @jinfengni , 
    
    As it turns out, we do have a comprehensive design for the original feature and the MVCC revision. The key goals are that a function, once registered, is guaranteed to be available on all Drillbits once it is visible to any particular Foreman. Without this guarantee of consistency, DUDFs become non-determinstic and will cause customer problems.
    
    We do have a "refresh" operation: registering a DUDF updates ZK which sends updates to each node. The problem is the race condition. I register a UDF foo() on node A. I run a query from that same node. If my query happens to hit node B before the ZK notification, the query will fail. Our goal is that such failure cannot happen, hence the need for a "pull" model to augment the ZK-based "push" model.
    
    A manual "update" would have the same issue unless we synchronized the update across all nodes. Also, the only way to ensure that DUDFs are available is to issue an update after adding each DUDF. But, if we did that, we might as well make the DUDF registration itself synchronous across all nodes.
    
    And, of course, the node synchronization does not handle the race condition in which a new node comes up right after a synchronization starts. We'd have to ensure that the new node reads the proper state from ZK. We can do that if we first update ZK, then do synchronization to all nodes, then update ZK with the fact that all nodes are aware of the DUDF. 
    
    Without the "two-phase" process, our new node can come up, learn of the new DUDF and issue a query using the DUDF without some nodes having been notified of the synchronization.
    
    Overall, this is a difficult area. Relying on the well-known semantics of MVCC makes the problems much easier to solve.
    
    So, the question here is whether it is worth checking in this partial solution for 1.10, or just leave the problem open until a complete solution is available.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill issue #701: DRILL-4963: Fix issues with dynamically loaded overloaded ...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on the issue:

    https://github.com/apache/drill/pull/701
  
    @paul-rogers addressed review comments. As agreed concerns about concurrency and performance will be addressed once MVCC will be implemented.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill issue #701: DRILL-4963: Fix issues with dynamically loaded overloaded ...

Posted by jinfengni <gi...@git.apache.org>.
Github user jinfengni commented on the issue:

    https://github.com/apache/drill/pull/701
  
    @arina-ielchiieva ,
    
    Regarding your 3rd comment, we probably can discuss further once you have the design. I would think we may process "refresh function registry" command as a query (or more like CTAS, since it would update something); if one drillbit fails, fail the command with proper error message. Use can decide what to do, either re-run the command after addressing the cause of failure, or run query knowing it might hit problems. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request #701: DRILL-4963: Fix issues with dynamically loaded over...

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/701#discussion_r102920190
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java ---
    @@ -260,76 +293,101 @@ public RemoteFunctionRegistry getRemoteFunctionRegistry() {
       }
     
       /**
    -   * Attempts to load and register functions from remote function registry.
    -   * First checks if there is no missing jars.
    -   * If yes, enters synchronized block to prevent other loading the same jars.
    -   * Again re-checks if there are no missing jars in case someone has already loaded them (double-check lock).
    -   * If there are still missing jars, first copies jars to local udf area and prepares {@link JarScan} for each jar.
    -   * Jar registration timestamp represented in milliseconds is used as suffix.
    -   * Then registers all jars at the same time. Returns true when finished.
    -   * In case if any errors during jars coping or registration, logs errors and proceeds.
    +   * Purpose of this method is to synchronize remote and local function registries if needed
    +   * and to inform if function registry was changed after given version.
        *
    -   * If no missing jars are found, checks current local registry version.
    -   * Returns false if versions match, true otherwise.
    +   * To make synchronization as much light-weigh as possible, first only versions of both registries are checked
    +   * without any locking. If synchronization is needed, enters synchronized block to prevent others loading the same jars.
    +   * The need of synchronization is checked again (double-check lock) before comparing jars.
    +   * If any missing jars are found, they are downloaded to local udf area, each is wrapped into {@link JarScan}.
    +   * Once jar download is finished, all missing jars are registered in one batch.
    +   * In case if any errors during jars download / registration, these errors are logged.
        *
    -   * @param version local function registry version
    -   * @return true if new jars were registered or local function registry version is different, false otherwise
    +   * During registration local function registry is updated with remote function registry version it is synced with.
    +   * When at least one jar of the missing jars failed to download / register,
    +   * local function registry version are not updated but jars that where successfully downloaded / registered
    +   * are added to local function registry.
    +   *
    +   * If synchronization between remote and local function registry was not needed,
    +   * checks if given registry version matches latest sync version
    +   * to inform if function registry was changed after given version.
    +   *
    +   * @param version remote function registry local function registry was based on
    +   * @return true if remote and local function registries were synchronized after given version
        */
    -  public boolean loadRemoteFunctions(long version) {
    -    List<String> missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry);
    -    if (!missingJars.isEmpty()) {
    +  public boolean syncWithRemoteRegistry(long version) {
    +    if (doSyncFunctionRegistries(remoteFunctionRegistry.getRegistryVersion(), localFunctionRegistry.getVersion())) {
           synchronized (this) {
    -        missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry);
    -        if (!missingJars.isEmpty()) {
    -          logger.info("Starting dynamic UDFs lazy-init process.\n" +
    -              "The following jars are going to be downloaded and registered locally: " + missingJars);
    +        long localRegistryVersion = localFunctionRegistry.getVersion();
    +        if (doSyncFunctionRegistries(remoteFunctionRegistry.getRegistryVersion(), localRegistryVersion))  {
    +          DataChangeVersion remoteVersion = new DataChangeVersion();
    +          List<String> missingJars = getMissingJars(this.remoteFunctionRegistry, localFunctionRegistry, remoteVersion);
               List<JarScan> jars = Lists.newArrayList();
    -          for (String jarName : missingJars) {
    -            Path binary = null;
    -            Path source = null;
    -            URLClassLoader classLoader = null;
    -            try {
    -              binary = copyJarToLocal(jarName, remoteFunctionRegistry);
    -              source = copyJarToLocal(JarUtil.getSourceName(jarName), remoteFunctionRegistry);
    -              URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()};
    -              classLoader = new URLClassLoader(urls);
    -              ScanResult scanResult = scan(classLoader, binary, urls);
    -              localFunctionRegistry.validate(jarName, scanResult);
    -              jars.add(new JarScan(jarName, scanResult, classLoader));
    -            } catch (Exception e) {
    -              deleteQuietlyLocalJar(binary);
    -              deleteQuietlyLocalJar(source);
    -              if (classLoader != null) {
    -                try {
    -                  classLoader.close();
    -                } catch (Exception ex) {
    -                  logger.warn("Problem during closing class loader for {}", jarName, e);
    +          if (!missingJars.isEmpty()) {
    +            logger.info("Starting dynamic UDFs lazy-init process.\n" +
    +                "The following jars are going to be downloaded and registered locally: " + missingJars);
    +            for (String jarName : missingJars) {
    +              Path binary = null;
    +              Path source = null;
    +              URLClassLoader classLoader = null;
    +              try {
    +                binary = copyJarToLocal(jarName, this.remoteFunctionRegistry);
    +                source = copyJarToLocal(JarUtil.getSourceName(jarName), this.remoteFunctionRegistry);
    +                URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()};
    +                classLoader = new URLClassLoader(urls);
    +                ScanResult scanResult = scan(classLoader, binary, urls);
    +                localFunctionRegistry.validate(jarName, scanResult);
    +                jars.add(new JarScan(jarName, scanResult, classLoader));
    +              } catch (Exception e) {
    +                deleteQuietlyLocalJar(binary);
    +                deleteQuietlyLocalJar(source);
    +                if (classLoader != null) {
    +                  try {
    +                    classLoader.close();
    +                  } catch (Exception ex) {
    +                    logger.warn("Problem during closing class loader for {}", jarName, e);
    +                  }
                     }
    +                logger.error("Problem during remote functions load from {}", jarName, e);
                   }
    -              logger.error("Problem during remote functions load from {}", jarName, e);
                 }
               }
    -          if (!jars.isEmpty()) {
    -            localFunctionRegistry.register(jars);
    -            return true;
    -          }
    +          long latestRegistryVersion = jars.size() != missingJars.size() ?
    +              localRegistryVersion : remoteVersion.getVersion();
    +          localFunctionRegistry.register(jars, latestRegistryVersion);
    +          return true;
             }
           }
         }
    +
         return version != localFunctionRegistry.getVersion();
       }
     
       /**
    -   * First finds path to marker file url, otherwise throws {@link JarValidationException}.
    -   * Then scans jar classes according to list indicated in marker files.
    -   * Additional logic is added to close {@link URL} after {@link ConfigFactory#parseURL(URL)}.
    -   * This is extremely important for Windows users where system doesn't allow to delete file if it's being used.
    +   * Checks if local function registry should be synchronized with remote function registry.
    +   * If remote function registry version is -1, it means that remote function registry is unreachable
    +   * or is not configured thus we skip synchronization and return false.
    +   * In all other cases synchronization is needed if remote and local function registries versions do not match.
        *
    -   * @param classLoader unique class loader for jar
    -   * @param path local path to jar
    -   * @param urls urls associated with the jar (ex: binary and source)
    -   * @return scan result of packages, classes, annotations found in jar
    +   * @param remoteVersion remote function registry version
    +   * @param localVersion local function registry version
    +   * @return true is local registry should be refreshed, false otherwise
        */
    +  private boolean doSyncFunctionRegistries(long remoteVersion, long localVersion) {
    --- End diff --
    
    Agree. Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---