You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2023/05/02 13:01:41 UTC

[drill] branch master updated: DRILL-8426: Fix endless retrying zk set data for a large query (#2796)

This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ce0c88ffd DRILL-8426: Fix endless retrying zk set data for a large query (#2796)
5ce0c88ffd is described below

commit 5ce0c88ffd6e50245da5ef30cf47a321acaa12fb
Author: Maksym Rymar <ri...@gmail.com>
AuthorDate: Tue May 2 16:01:31 2023 +0300

    DRILL-8426: Fix endless retrying zk set data for a large query (#2796)
---
 .../drill/exec/coord/zk/ZookeeperClient.java       | 29 +++++++++++++++++++---
 .../apache/drill/exec/work/foreman/Foreman.java    |  3 +--
 .../drill/exec/work/foreman/QueryManager.java      | 10 ++++++--
 .../java-exec/src/main/resources/drill-module.conf |  2 +-
 4 files changed, 35 insertions(+), 9 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
index fee607d2ab..ec98453e05 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
@@ -33,10 +33,13 @@ import org.apache.drill.common.collections.ImmutableEntry;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.exception.VersionMismatchException;
 import org.apache.drill.exec.store.sys.store.DataChangeVersion;
+import org.apache.jute.BinaryInputArchive;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A namespace aware Zookeeper client.
@@ -46,10 +49,12 @@ import org.apache.zookeeper.data.Stat;
  * Note that instance of this class holds onto resources that must be released via {@code #close()}.
  */
 public class ZookeeperClient implements AutoCloseable {
+  private static final Logger logger = LoggerFactory.getLogger(ZookeeperClient.class);
   private final CuratorFramework curator;
   private final String root;
   private final PathChildrenCache cache;
   private final CreateMode mode;
+  private final int MAX_DATA_LENGTH = BinaryInputArchive.maxBuffer;
 
   public ZookeeperClient(final CuratorFramework curator, final String root, final CreateMode mode) {
     this.curator = Preconditions.checkNotNull(curator, "curator is required");
@@ -241,6 +246,7 @@ public class ZookeeperClient implements AutoCloseable {
    *
    * @param path  target path
    * @param data  data to store
+   * @throws java.lang.IllegalArgumentException if data size is bigger that jute.maxbuffer value
    */
   public void put(final String path, final byte[] data) {
     put(path, data, null);
@@ -248,9 +254,9 @@ public class ZookeeperClient implements AutoCloseable {
 
   /**
    * Puts the given byte sequence into the given path.
-   *
+   * <p>
    * If path does not exists, this call creates it.
-   *
+   * <p>
    * If version holder is not null and path already exists, passes given version for comparison.
    * Zookeeper maintains stat structure that holds version number which increases each time znode data change is performed.
    * If we pass version that doesn't match the actual version of the data,
@@ -258,13 +264,19 @@ public class ZookeeperClient implements AutoCloseable {
    * We catch such exception and re-throw it as {@link VersionMismatchException}.
    * Link to documentation - https://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#sc_zkDataModel_znodes
    *
-   * @param path  target path
-   * @param data  data to store
+   * @param path    target path
+   * @param data    data to store
    * @param version version holder
+   * @throws java.lang.IllegalArgumentException if data size is bigger that jute.maxbuffer value
    */
   public void put(final String path, final byte[] data, DataChangeVersion version) {
     Preconditions.checkNotNull(path, "path is required");
     Preconditions.checkNotNull(data, "data is required");
+    if (data.length > MAX_DATA_LENGTH) {
+      throw new IllegalArgumentException(
+        String.format("Can't put this data. Data size %d bytes is bigger than jute.maxbuffer %d", data.length, MAX_DATA_LENGTH)
+      );
+    }
 
     final String target = PathUtils.join(root, path);
     try {
@@ -297,6 +309,8 @@ public class ZookeeperClient implements AutoCloseable {
     } catch (final VersionMismatchException e) {
       throw e;
     } catch (final Exception e) {
+      logger.info("Data size to persist is: {} bytes, client jute.maxbuffer value is: {}. Make sure that the client " +
+        "jute.maxbuffer value corresponds to the zookeeper server value.", data.length, MAX_DATA_LENGTH);
       throw new DrillRuntimeException("unable to put ", e);
     }
   }
@@ -311,6 +325,11 @@ public class ZookeeperClient implements AutoCloseable {
   public byte[] putIfAbsent(final String path, final byte[] data) {
     Preconditions.checkNotNull(path, "path is required");
     Preconditions.checkNotNull(data, "data is required");
+    if (data.length > MAX_DATA_LENGTH) {
+      throw new IllegalArgumentException(
+        String.format("Can't put this data. Data size %d bytes is bigger than jute.maxbuffer %d", data.length, MAX_DATA_LENGTH)
+      );
+    }
 
     final String target = PathUtils.join(root, path);
     try {
@@ -323,6 +342,8 @@ public class ZookeeperClient implements AutoCloseable {
       }
       return curator.getData().forPath(target);
     } catch (final Exception e) {
+      logger.info("Data size to persist is: {} bytes, client jute.maxbuffer value is: {}. Make sure that the client " +
+        "jute.maxbuffer value corresponds to the zookeeper server value.", data.length, MAX_DATA_LENGTH);
       throw new DrillRuntimeException("unable to put ", e);
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 1ddc1150cc..202163533d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -252,9 +252,8 @@ public class Foreman implements Runnable {
     }
 
     queryText = queryRequest.getPlan();
-    queryStateProcessor.moveToState(QueryState.PLANNING, null);
-
     try {
+      queryStateProcessor.moveToState(QueryState.PLANNING, null);
       injector.injectChecked(queryContext.getExecutionControls(), "run-try-beginning", ForemanException.class);
 
       // convert a run query request into action
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index e99ba5a402..10679b061c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -297,8 +297,14 @@ public class QueryManager implements AutoCloseable {
       case STARTING:
       case RUNNING:
       case CANCELLATION_REQUESTED:
-        runningProfileStore.put(stringQueryId, getQueryInfo());  // store as ephemeral query profile.
-        inTransientStore = true;
+        try {
+          runningProfileStore.put(stringQueryId, getQueryInfo());  // store as ephemeral query profile.
+          inTransientStore = true;
+        } catch (IllegalArgumentException e) {
+          throw UserException.executionError(e)
+            .message("Failed to persist query info. Query length is too big.", e)
+            .build(logger);
+        }
         break;
       case COMPLETED:
       case CANCELED:
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 488d8af088..ff817b5e4b 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -142,7 +142,7 @@ drill.exec: {
     refresh: 500,
     timeout: 5000,
     retry: {
-      count: 7200,
+      count: 15,
       delay: 500
     },
     apply_secure_acl: false,