You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2023/05/02 13:01:41 UTC
[drill] branch master updated: DRILL-8426: Fix endless retrying zk set data for a large query (#2796)
This is an automated email from the ASF dual-hosted git repository.
dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 5ce0c88ffd DRILL-8426: Fix endless retrying zk set data for a large query (#2796)
5ce0c88ffd is described below
commit 5ce0c88ffd6e50245da5ef30cf47a321acaa12fb
Author: Maksym Rymar <ri...@gmail.com>
AuthorDate: Tue May 2 16:01:31 2023 +0300
DRILL-8426: Fix endless retrying zk set data for a large query (#2796)
---
.../drill/exec/coord/zk/ZookeeperClient.java | 29 +++++++++++++++++++---
.../apache/drill/exec/work/foreman/Foreman.java | 3 +--
.../drill/exec/work/foreman/QueryManager.java | 10 ++++++--
.../java-exec/src/main/resources/drill-module.conf | 2 +-
4 files changed, 35 insertions(+), 9 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
index fee607d2ab..ec98453e05 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
@@ -33,10 +33,13 @@ import org.apache.drill.common.collections.ImmutableEntry;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.exception.VersionMismatchException;
import org.apache.drill.exec.store.sys.store.DataChangeVersion;
+import org.apache.jute.BinaryInputArchive;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A namespace aware Zookeeper client.
@@ -46,10 +49,12 @@ import org.apache.zookeeper.data.Stat;
* Note that instance of this class holds onto resources that must be released via {@code #close()}.
*/
public class ZookeeperClient implements AutoCloseable {
+ private static final Logger logger = LoggerFactory.getLogger(ZookeeperClient.class);
private final CuratorFramework curator;
private final String root;
private final PathChildrenCache cache;
private final CreateMode mode;
+ private final int MAX_DATA_LENGTH = BinaryInputArchive.maxBuffer;
public ZookeeperClient(final CuratorFramework curator, final String root, final CreateMode mode) {
this.curator = Preconditions.checkNotNull(curator, "curator is required");
@@ -241,6 +246,7 @@ public class ZookeeperClient implements AutoCloseable {
*
* @param path target path
* @param data data to store
+ * @throws java.lang.IllegalArgumentException if data size is bigger that jute.maxbuffer value
*/
public void put(final String path, final byte[] data) {
put(path, data, null);
@@ -248,9 +254,9 @@ public class ZookeeperClient implements AutoCloseable {
/**
* Puts the given byte sequence into the given path.
- *
+ * <p>
* If path does not exists, this call creates it.
- *
+ * <p>
* If version holder is not null and path already exists, passes given version for comparison.
* Zookeeper maintains stat structure that holds version number which increases each time znode data change is performed.
* If we pass version that doesn't match the actual version of the data,
@@ -258,13 +264,19 @@ public class ZookeeperClient implements AutoCloseable {
* We catch such exception and re-throw it as {@link VersionMismatchException}.
* Link to documentation - https://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#sc_zkDataModel_znodes
*
- * @param path target path
- * @param data data to store
+ * @param path target path
+ * @param data data to store
* @param version version holder
+ * @throws java.lang.IllegalArgumentException if data size is bigger that jute.maxbuffer value
*/
public void put(final String path, final byte[] data, DataChangeVersion version) {
Preconditions.checkNotNull(path, "path is required");
Preconditions.checkNotNull(data, "data is required");
+ if (data.length > MAX_DATA_LENGTH) {
+ throw new IllegalArgumentException(
+ String.format("Can't put this data. Data size %d bytes is bigger than jute.maxbuffer %d", data.length, MAX_DATA_LENGTH)
+ );
+ }
final String target = PathUtils.join(root, path);
try {
@@ -297,6 +309,8 @@ public class ZookeeperClient implements AutoCloseable {
} catch (final VersionMismatchException e) {
throw e;
} catch (final Exception e) {
+ logger.info("Data size to persist is: {} bytes, client jute.maxbuffer value is: {}. Make sure that the client " +
+ "jute.maxbuffer value corresponds to the zookeeper server value.", data.length, MAX_DATA_LENGTH);
throw new DrillRuntimeException("unable to put ", e);
}
}
@@ -311,6 +325,11 @@ public class ZookeeperClient implements AutoCloseable {
public byte[] putIfAbsent(final String path, final byte[] data) {
Preconditions.checkNotNull(path, "path is required");
Preconditions.checkNotNull(data, "data is required");
+ if (data.length > MAX_DATA_LENGTH) {
+ throw new IllegalArgumentException(
+ String.format("Can't put this data. Data size %d bytes is bigger than jute.maxbuffer %d", data.length, MAX_DATA_LENGTH)
+ );
+ }
final String target = PathUtils.join(root, path);
try {
@@ -323,6 +342,8 @@ public class ZookeeperClient implements AutoCloseable {
}
return curator.getData().forPath(target);
} catch (final Exception e) {
+ logger.info("Data size to persist is: {} bytes, client jute.maxbuffer value is: {}. Make sure that the client " +
+ "jute.maxbuffer value corresponds to the zookeeper server value.", data.length, MAX_DATA_LENGTH);
throw new DrillRuntimeException("unable to put ", e);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 1ddc1150cc..202163533d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -252,9 +252,8 @@ public class Foreman implements Runnable {
}
queryText = queryRequest.getPlan();
- queryStateProcessor.moveToState(QueryState.PLANNING, null);
-
try {
+ queryStateProcessor.moveToState(QueryState.PLANNING, null);
injector.injectChecked(queryContext.getExecutionControls(), "run-try-beginning", ForemanException.class);
// convert a run query request into action
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index e99ba5a402..10679b061c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -297,8 +297,14 @@ public class QueryManager implements AutoCloseable {
case STARTING:
case RUNNING:
case CANCELLATION_REQUESTED:
- runningProfileStore.put(stringQueryId, getQueryInfo()); // store as ephemeral query profile.
- inTransientStore = true;
+ try {
+ runningProfileStore.put(stringQueryId, getQueryInfo()); // store as ephemeral query profile.
+ inTransientStore = true;
+ } catch (IllegalArgumentException e) {
+ throw UserException.executionError(e)
+ .message("Failed to persist query info. Query length is too big.", e)
+ .build(logger);
+ }
break;
case COMPLETED:
case CANCELED:
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 488d8af088..ff817b5e4b 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -142,7 +142,7 @@ drill.exec: {
refresh: 500,
timeout: 5000,
retry: {
- count: 7200,
+ count: 15,
delay: 500
},
apply_secure_acl: false,