You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/10/09 04:41:24 UTC

[GitHub] [druid] paul-rogers opened a new pull request, #13196: Async reads for JDBC

paul-rogers opened a new pull request, #13196:
URL: https://github.com/apache/druid/pull/13196

   Prevents JDBC timeouts on long queries by returning empty batches when a batch fetch takes too long. Uses an async model to run the result fetch concurrently with JDBC requests.
   
   ### Release Notes
   
   Druid's Avatica-based JDBC handler is not the preferred way to run long-running queries in Druid.
   
   Druid supports the Avatica JDBC driver. Avatica uses HTTP requests to communicate with the server. When using JDBC with long-running queries, the HTTP request can time out, producing an error. This PR uses an internal feature of JDBC to avoid timeouts by returning "empty batches" of rows when Druid takes too long to return the actual rows. The JDBC client automatically requests more rows, resulting in Druid queries running asynchronously with JDBC requests. The result is that JDBC queries no longer time out.
   
   This feature is enabled by default with a timeout of 5 seconds. You can modify the time out by changing the `druid.sql.avatica.fetchTimeoutMs` property to the new timeout. Specify the timeout in milliseconds. Druid enforces a minimum of 1000 milliseconds to prevent hammering of the Broker.
   
   ### Description
   
   Druid's Avatica handler implementation already uses async execution to open and close the "yielder" for a query. This PR extends the idea by using the same executor for fetches. The fetch state (yielder, fetch offset) resides in a new `ResultFetcher` class that is invoked on each request to get results.
   
   The async logic is simple:
   
   * If a future exists from the previous fetch exists, use it.
   * Otherwise, invoke the fetcher using the existing `ExecutorService` which returns a future.
   * Wait for the future to finish, but only up to the fetch timeout.
   * If the wait times out, save the future and return an empty batch.
   * If the results arrive in time, return them.
   
   The close operation is modified to wait for any in-flight fetch to return before shutting down the result set. A `ResultFetcherFactory` creates the fetcher for each query. The factory is needed only to allow introducing artificial delays for testing.
   
   The Avatica handler tests are cleaned up to eliminate some redundancy. This then allowed tests for async to be created with less copy/paste than with the existing code.
   
   <hr>
   
   This PR has:
   - [X] been self-reviewed.
      - [X] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [X] added documentation for new or modified features or behaviors.
   - [X] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [X] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [X] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on a diff in pull request #13196: Async reads for JDBC

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on code in PR #13196:
URL: https://github.com/apache/druid/pull/13196#discussion_r993892709


##########
docs/configuration/index.md:
##########
@@ -1888,6 +1888,7 @@ The Druid SQL server is configured through the following properties on the Broke
 |`druid.sql.avatica.minRowsPerFrame`|Minimum acceptable value for the JDBC client `Statement.setFetchSize` method. The value for this property must greater than 0. If the JDBC client calls `Statement.setFetchSize` with a lesser value, Druid uses `minRowsPerFrame` instead. If `maxRowsPerFrame` is less than `minRowsPerFrame`, Druid uses the minimum value of the two. For handling queries which produce results with a large number of rows, you can increase this value to reduce the number of fetches required to completely transfer the result set.|100|
 |`druid.sql.avatica.maxStatementsPerConnection`|Maximum number of simultaneous open statements per Avatica client connection.|4|
 |`druid.sql.avatica.connectionIdleTimeout`|Avatica client connection idle timeout.|PT5M|
+|`druid.sql.avatica.fetchTimeoutMs`|Avatica fetch timeout, in milliseconds. When a request for the next batch of data takes longer than this time, Druid returns an empty result set, causing the client to poll again. This avoids HTTP timeouts for long-running queries. |5000|

Review Comment:
   This is a tricky one! The value should be less than the minimum HTTP timeout anywhere in the request chain: client, proxy server, gateway/firewall, Router and Broker. It is hard to know these. The default is 5 sec, which should be well under any reasonable timeout, without impacting performance. (The timeout happens only when the query is slow, so we're outside the high-performance realm already.)
   
   Added a comment that the default of 5 sec. should be fine in most cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on pull request #13196: Async reads for JDBC

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on PR #13196:
URL: https://github.com/apache/druid/pull/13196#issuecomment-1272466167

   @FrankChen021, you get the Fastest Review Ever award! Thanks!
   
   > I always thought the HTTP timeout is a client behavior, and didn't know that it can be controlled by the server.
   And I'm curious that is such internal feature implemented by Avatica JDBC driver only? Could you show me some materials to learn more about it?
   
   You are right: the client (and proxies and Router) all determine timeout: the shortest timeout wins. The challenge is when that HTTP timeout is shorter than the amount of time Druid needs to compute the next batch of results. In a regular REST API via the `/sql` endpoint, we have to compute and return all the results within this timeout. This is usually not a problem because Druid is intended for simple queries: those that return in 100ms with a few hundred or thousand rows, and that exploit Druid's time partitioning, filters, etc.
   
   The challenge is with the occasional "BI" style query that, for whatever reason, returns many rows, or takes a long time to compute. Maybe you've got billions of rows and need to find the one event, anywhere in that set, that represents access to some resource. There is nothing for it but to scan all the data, and that might take a while depending on the filter used. Or, maybe someone wants to grab several million rows of data to feed into an ML model. And so on.
   
   Normally you would not want to run such a query on Druid: that;'s not what Druid is designed for. But, sometimes you just gotta do what you gotta do. In this case, if your query takes a minute to run, and one of the network elements in the chain has a 30 second timeout, you can't successfully run the query with request response.
   
   But, JDBC is different. It works by sending multiple requests for each query, each returning some number of rows, generally in the thousands. The client keeps asking for more batches until the server returns EOF. So, if we had a long running query, but each batch of records could be computed in less time than the HTTP timeout, the query would succeed.
   
   Most of the time in a big query, however, is the work before we get the first row. So, we want to generalize the above idea. JDBC allows us to return a batch of 0 rows. If so, the client just turns around and requests another batch until it gets rows or is told that an EOF occurred. Just to be clear, this polling is a feature of the Avatica client. The application just iterates though a `ResultSet`, blissfully ignorant of the goings-on under the covers.
   
   That's what the async thing in this PR does for us. We tell Druid to go get a batch of rows. If this takes longer than the *fetch timeout*, we return an empty batch, and remember that Druid is busily working away to get us a batch. Maybe it will be ready next time. If, so, we return it. If not, we again return an empty batch and the whole thing repeats.
   
   For this to work, the fetch timeout has to be less than the HTTP timeout. Given jitter, we might use the Nyquist rule and set the fetch timeout to half of the HTTP timeout. It doesn't have to be exact, any value less than 1/2 the network timeout should be fine.
   
   Voila! We've decoupled query run time from network timeouts by running the query (or at least each batch fetch) asynchronously with the Avatica REST HTTP requests. So, you see, we don't set the HTTP timeout, we just work around it by *always* returning within the timeout, knowing that the client will do the right thing if we return 0 rows to stay within the timeout.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on pull request #13196: Async reads for JDBC

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on PR #13196:
URL: https://github.com/apache/druid/pull/13196#issuecomment-1275351660

   @FrankChen021, thanks for taking a look at the PR and for working out the proper tag. You suggest:
   
   > Maybe we need a tag Area - JDBC to tag any issues about our server side processing related to JDBC.
   
   When used for issues, it is hard for a user to know that an issue is in JDBC vs. the other areas you mention. Us developers know, but JDBC is a rather small area. Maybe `Area - Clients` or `Area - API` to encompass JDBC, REST and any other APIs we might invent? But, really, any of those tags are fine if it helps us sort PRs in a useful way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] zachjsh commented on a diff in pull request #13196: Async reads for JDBC

Posted by GitBox <gi...@apache.org>.
zachjsh commented on code in PR #13196:
URL: https://github.com/apache/druid/pull/13196#discussion_r991605412


##########
sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java:
##########
@@ -139,34 +260,52 @@ public synchronized Meta.Signature getSignature()
   public synchronized Meta.Frame nextFrame(final long fetchOffset, final int fetchMaxRowCount)
   {
     ensure(State.RUNNING, State.DONE);
-    Preconditions.checkState(fetchOffset == offset, "fetchOffset [%,d] != offset [%,d]", fetchOffset, offset);
+    Preconditions.checkState(fetchOffset == nextFetchOffset, "fetchOffset [%,d] != offset [%,d]", fetchOffset, nextFetchOffset);
     if (state == State.DONE) {
-      return new Meta.Frame(fetchOffset, true, Collections.emptyList());
+      return new Meta.Frame(fetcher.offset(), true, Collections.emptyList());
     }
 
+    final Future<Meta.Frame> future;
+    if (fetchFuture == null) {
+      // Not waiting on a batch. Request one now.
+      fetcher.setBatchSize(fetchMaxRowCount);
+      future = queryExecutor.submit(fetcher);
+    } else {
+      // Last batch took too long. Continue waiting for it.
+      future = fetchFuture;
+      fetchFuture = null;
+    }
     try {
-      final List<Object> rows = new ArrayList<>();
-      while (!yielder.isDone() && (fetchMaxRowCount < 0 || offset < fetchOffset + fetchMaxRowCount)) {
-        rows.add(yielder.get());
-        yielder = yielder.next(null);
-        offset++;
-      }
-
-      if (yielder.isDone()) {
+      Meta.Frame result = future.get(fetcherFactory.fetchTimeoutMs(), TimeUnit.MILLISECONDS);

Review Comment:
   Should we keep the existing behavior by default, and enable this new behavior through non-default configuration? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] LakshSingla commented on a diff in pull request #13196: Async reads for JDBC

Posted by GitBox <gi...@apache.org>.
LakshSingla commented on code in PR #13196:
URL: https://github.com/apache/druid/pull/13196#discussion_r992943472


##########
docs/configuration/index.md:
##########
@@ -1888,6 +1888,7 @@ The Druid SQL server is configured through the following properties on the Broke
 |`druid.sql.avatica.minRowsPerFrame`|Minimum acceptable value for the JDBC client `Statement.setFetchSize` method. The value for this property must greater than 0. If the JDBC client calls `Statement.setFetchSize` with a lesser value, Druid uses `minRowsPerFrame` instead. If `maxRowsPerFrame` is less than `minRowsPerFrame`, Druid uses the minimum value of the two. For handling queries which produce results with a large number of rows, you can increase this value to reduce the number of fetches required to completely transfer the result set.|100|
 |`druid.sql.avatica.maxStatementsPerConnection`|Maximum number of simultaneous open statements per Avatica client connection.|4|
 |`druid.sql.avatica.connectionIdleTimeout`|Avatica client connection idle timeout.|PT5M|
+|`druid.sql.avatica.fetchTimeoutMs`|Avatica fetch timeout, in milliseconds. When a request for the next batch of data takes longer than this time, Druid returns an empty result set, causing the client to poll again. This avoids HTTP timeouts for long-running queries. |5000|

Review Comment:
   Can there be a suggestion for a good value that one (who is configuring this property) can set based on their querying patterns? For example, something like `Set it to something which is lesser than the estimated time between two consecutive fetches`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] zachjsh commented on a diff in pull request #13196: Async reads for JDBC

Posted by GitBox <gi...@apache.org>.
zachjsh commented on code in PR #13196:
URL: https://github.com/apache/druid/pull/13196#discussion_r991593162


##########
sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java:
##########
@@ -139,34 +260,52 @@ public synchronized Meta.Signature getSignature()
   public synchronized Meta.Frame nextFrame(final long fetchOffset, final int fetchMaxRowCount)
   {
     ensure(State.RUNNING, State.DONE);
-    Preconditions.checkState(fetchOffset == offset, "fetchOffset [%,d] != offset [%,d]", fetchOffset, offset);
+    Preconditions.checkState(fetchOffset == nextFetchOffset, "fetchOffset [%,d] != offset [%,d]", fetchOffset, nextFetchOffset);
     if (state == State.DONE) {
-      return new Meta.Frame(fetchOffset, true, Collections.emptyList());
+      return new Meta.Frame(fetcher.offset(), true, Collections.emptyList());
     }
 
+    final Future<Meta.Frame> future;
+    if (fetchFuture == null) {
+      // Not waiting on a batch. Request one now.
+      fetcher.setBatchSize(fetchMaxRowCount);
+      future = queryExecutor.submit(fetcher);
+    } else {
+      // Last batch took too long. Continue waiting for it.
+      future = fetchFuture;
+      fetchFuture = null;
+    }
     try {
-      final List<Object> rows = new ArrayList<>();
-      while (!yielder.isDone() && (fetchMaxRowCount < 0 || offset < fetchOffset + fetchMaxRowCount)) {
-        rows.add(yielder.get());
-        yielder = yielder.next(null);
-        offset++;
-      }
-
-      if (yielder.isDone()) {
+      Meta.Frame result = future.get(fetcherFactory.fetchTimeoutMs(), TimeUnit.MILLISECONDS);
+      if (result.done) {
         state = State.DONE;
       }
-
-      return new Meta.Frame(fetchOffset, state == State.DONE, rows);
+      nextFetchOffset = fetcher.offset;
+      return result;
     }
-    catch (Throwable t) {
-      throw closeAndPropagateThrowable(t);
+    catch (CancellationException | InterruptedException e) {
+      // Consider this a failure.
+      throw closeAndPropagateThrowable(e);
+    }
+    catch (ExecutionException e) {
+      // Fetch threw an error. Unwrap it.
+      throw closeAndPropagateThrowable(e.getCause());
+    }
+    catch (TimeoutException e) {
+      fetchFuture = future;
+      // Wait timed out. Return 0 rows: the client will try again later.
+      // We'll wait again on this same fetch next time.
+      // Note that when the next fetch request comes, it will use the batch
+      // size set here: any change in size will be ignored for the in-flight batch.
+      // Changing batch size mid-query is an odd case: it will probably never happen.
+      return new Meta.Frame(nextFetchOffset, false, Collections.emptyList());

Review Comment:
   If we return 0 as batch size, does the avatica client automatically retry until it sees EOF, or is the client at risk of thinking there is 0 rows of data? I assume it waits for EOF, just wanted to double check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers merged pull request #13196: Async reads for JDBC

Posted by GitBox <gi...@apache.org>.
paul-rogers merged PR #13196:
URL: https://github.com/apache/druid/pull/13196


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on a diff in pull request #13196: Async reads for JDBC

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on code in PR #13196:
URL: https://github.com/apache/druid/pull/13196#discussion_r993885049


##########
sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java:
##########
@@ -95,7 +97,12 @@ public static <T extends Throwable> T logFailure(T error, String message, Object
    */
   public static <T extends Throwable> T logFailure(T error)
   {
-    logFailure(error, error.getMessage());
+    if (error instanceof NoSuchConnectionException) {

Review Comment:
   The reason is that Avatica itself handles this exception specially in `AbstractHandler`. Avatica maps this exception to a specific error code. It would be nice if this were a subclass of some Avatica base exception, but it isn't. 
   
   We catch `NoSuchConnectionException` so we can exclude it from `mapException(.)`. `mapException(.)` is a rough-and-ready way to clean up exceptions. Looking at this more closely, it seems we have to guess where to catch `NoSuchConnectionException`. In some places, we guessed wrong: catching it when it can never be thrown. So, a better solution is to let `mapException(.)` special-case the Avatica exceptions, rather than trying to guess where these exceptions might be thrown and handling them in individual `catch` clauses.
   
   Looking at `AbstractHandler`, it seems we should be throwing `AvaticaRuntimeException`s in our JDBC code so we can attach the preferred `SqlState`, `Severity` and `ErrorCode`. I'll make a note of this and tackle it in a later PR. This use of `AvaticaRuntimeException` parallels the discussion we've had about the proposed `DruidException`.
   
   While doing the above, I noticed that we catch the checked Avatica exception `NoSuchStatementException` when we don't need to. I'm not sure why IntelliJ inspections didn't complain. Fixed these also.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on pull request #13196: Async reads for JDBC

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on PR #13196:
URL: https://github.com/apache/druid/pull/13196#issuecomment-1272459343

   > This PR uses an internal feature of JDBC to avoid timeouts by returning "empty batches" of rows when Druid takes too long to return the actual rows. 
   
   I always thought the HTTP timeout is a client behavior, and didn't know that it can be controlled by the server. 
   And I'm curious that is such internal feature implemented by Avatica JDBC driver only? Could you show me some materials to learn more about it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on pull request #13196: Async reads for JDBC

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on PR #13196:
URL: https://github.com/apache/druid/pull/13196#issuecomment-1274696355

   I was thinking what's the proper tag to label this PR. 
   
   We have a label `Apache Avatica`, but that seems more related to the Avatica client instead of our server side implementation. 
   
   And `Area - Querying` is more related to query planning, query engine, query execution etc. Maybe we need a tag `Area - JDBC` to tag any issues about our server side processing related to JDBC.
   
   What do you think @paul-rogers @kfaraz 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on a diff in pull request #13196: Async reads for JDBC

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on code in PR #13196:
URL: https://github.com/apache/druid/pull/13196#discussion_r992832687


##########
sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java:
##########
@@ -139,34 +260,52 @@ public synchronized Meta.Signature getSignature()
   public synchronized Meta.Frame nextFrame(final long fetchOffset, final int fetchMaxRowCount)
   {
     ensure(State.RUNNING, State.DONE);
-    Preconditions.checkState(fetchOffset == offset, "fetchOffset [%,d] != offset [%,d]", fetchOffset, offset);
+    Preconditions.checkState(fetchOffset == nextFetchOffset, "fetchOffset [%,d] != offset [%,d]", fetchOffset, nextFetchOffset);
     if (state == State.DONE) {
-      return new Meta.Frame(fetchOffset, true, Collections.emptyList());
+      return new Meta.Frame(fetcher.offset(), true, Collections.emptyList());
     }
 
+    final Future<Meta.Frame> future;
+    if (fetchFuture == null) {
+      // Not waiting on a batch. Request one now.
+      fetcher.setBatchSize(fetchMaxRowCount);
+      future = queryExecutor.submit(fetcher);
+    } else {
+      // Last batch took too long. Continue waiting for it.
+      future = fetchFuture;
+      fetchFuture = null;
+    }
     try {
-      final List<Object> rows = new ArrayList<>();
-      while (!yielder.isDone() && (fetchMaxRowCount < 0 || offset < fetchOffset + fetchMaxRowCount)) {
-        rows.add(yielder.get());
-        yielder = yielder.next(null);
-        offset++;
-      }
-
-      if (yielder.isDone()) {
+      Meta.Frame result = future.get(fetcherFactory.fetchTimeoutMs(), TimeUnit.MILLISECONDS);

Review Comment:
   Great question! There is a tradeoff. Keeping the old code would be best done by having two variations of this class: the original one and this revised one. That would seem to increase the error surface by having more opportunities for bugs to creep in.
   
   It would seem that the new functionality is binary: it works or it doesn't. Users are obligated to use the Avatica client, so if we test that the zero-batch trick works for that client, we should be good.
   
   So, my thought is we just have the one implementation, and let tests tell is if anything broke. Let's revisit this decision if the tests tell us something is flaky and our assumptions are not valid. 



##########
sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java:
##########
@@ -139,34 +260,52 @@ public synchronized Meta.Signature getSignature()
   public synchronized Meta.Frame nextFrame(final long fetchOffset, final int fetchMaxRowCount)
   {
     ensure(State.RUNNING, State.DONE);
-    Preconditions.checkState(fetchOffset == offset, "fetchOffset [%,d] != offset [%,d]", fetchOffset, offset);
+    Preconditions.checkState(fetchOffset == nextFetchOffset, "fetchOffset [%,d] != offset [%,d]", fetchOffset, nextFetchOffset);
     if (state == State.DONE) {
-      return new Meta.Frame(fetchOffset, true, Collections.emptyList());
+      return new Meta.Frame(fetcher.offset(), true, Collections.emptyList());
     }
 
+    final Future<Meta.Frame> future;
+    if (fetchFuture == null) {
+      // Not waiting on a batch. Request one now.
+      fetcher.setBatchSize(fetchMaxRowCount);
+      future = queryExecutor.submit(fetcher);
+    } else {
+      // Last batch took too long. Continue waiting for it.
+      future = fetchFuture;
+      fetchFuture = null;
+    }
     try {
-      final List<Object> rows = new ArrayList<>();
-      while (!yielder.isDone() && (fetchMaxRowCount < 0 || offset < fetchOffset + fetchMaxRowCount)) {
-        rows.add(yielder.get());
-        yielder = yielder.next(null);
-        offset++;
-      }
-
-      if (yielder.isDone()) {
+      Meta.Frame result = future.get(fetcherFactory.fetchTimeoutMs(), TimeUnit.MILLISECONDS);
+      if (result.done) {
         state = State.DONE;
       }
-
-      return new Meta.Frame(fetchOffset, state == State.DONE, rows);
+      nextFetchOffset = fetcher.offset;
+      return result;
     }
-    catch (Throwable t) {
-      throw closeAndPropagateThrowable(t);
+    catch (CancellationException | InterruptedException e) {
+      // Consider this a failure.
+      throw closeAndPropagateThrowable(e);
+    }
+    catch (ExecutionException e) {
+      // Fetch threw an error. Unwrap it.
+      throw closeAndPropagateThrowable(e.getCause());
+    }
+    catch (TimeoutException e) {
+      fetchFuture = future;
+      // Wait timed out. Return 0 rows: the client will try again later.
+      // We'll wait again on this same fetch next time.
+      // Note that when the next fetch request comes, it will use the batch
+      // size set here: any change in size will be ignored for the in-flight batch.
+      // Changing batch size mid-query is an odd case: it will probably never happen.
+      return new Meta.Frame(nextFetchOffset, false, Collections.emptyList());

Review Comment:
   There is a new test that verifies that this is true. It sleeps for 5 seconds before fetching the first row. The fetch timeout is 1 second. And, sure enough, if we add some logging (then strip out the noise):
   
   ```text
   Timeout of batch at offset 0
   Timeout of batch at offset 0 
   Fetched batch at offset 0
   ```
   
   The first two log lines indicate that we returned a zero-row batch to the Avatica client which turned around and asked us again. Again we returned no rows, so the client asked a third time,  this time with feeling, "give me some rows!", and we obliged with the 6 rows from the test table. (Sadly, we don't have access to the batch row count, so it isn't logged above.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a diff in pull request #13196: Async reads for JDBC

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on code in PR #13196:
URL: https://github.com/apache/druid/pull/13196#discussion_r990736258


##########
sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java:
##########
@@ -55,6 +60,104 @@
  */
 public class DruidJdbcResultSet implements Closeable
 {
+  /**
+   * Asynchronous result fetcher. JDBC operates via REST, which is subject to
+   * timeouts if a query takes to long to respond. Fortunately, JDBC uses a
+   * batched API, and is perfectly happy to get an empty batch. This class
+   * runs in a separate thread to fetch a batch. If the fetch takes too long,
+   * the JDBC request thread will time out waiting, will return an empty batch
+   * to the client, and will remember the fetch for use in the next fetch
+   * request. The result is that the time it takes to produce results for long
+   * running queries is decoupled from the HTTP timeout.
+   */
+  public static class ResultFetcher implements Callable<Meta.Frame>
+  {
+    private final int limit;
+    private int batchSize;
+    private int offset;
+    private Yielder<Object[]> yielder;
+
+    public ResultFetcher(
+        final int limit,
+        final Yielder<Object[]> yielder
+    )
+    {
+      this.limit = limit;
+      this.yielder = yielder;
+    }
+
+    /**
+     * In an ideal world, the batch size would be a constructor parameter. But, JDBC,
+     * oddly, allows a different batch size per request. Hence, we set the size using
+     * this method before each fetch attempt.
+     */
+    public void setBatchSize(int batchSize)
+    {
+      this.batchSize = batchSize;
+    }
+
+    /**
+     * Result is only valid between executions, which turns out to be
+     * the only time it is called.
+     */
+    public int offset()
+    {
+      return offset;
+    }
+
+    /**
+     * Fetch the next batch up to the batch size or EOF. Return
+     * the resulting frame. Exceptions are handled by the executor
+     * framework.
+     */
+    @Override
+    public Meta.Frame call()
+    {
+      Preconditions.checkState(batchSize > 0);
+      int rowCount = 0;
+      final int batchLimit = Math.min(limit - offset, batchSize);
+      Yielder<Object[]> yielder = this.yielder;
+      final List<Object> rows = new ArrayList<>();

Review Comment:
   ```suggestion
         final List<Object> rows = new ArrayList<>(batchLimit);
   ```



##########
sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java:
##########
@@ -73,25 +176,46 @@ public class DruidJdbcResultSet implements Closeable
    * https://github.com/apache/druid/pull/4288
    * https://github.com/apache/druid/pull/4415
    */
-  private final ExecutorService yielderOpenCloseExecutor;
+  private final ExecutorService queryExecutor;
   private final DirectStatement stmt;
   private final long maxRowCount;
+  private final ResultFetcherFactory fetcherFactory;
   private State state = State.NEW;
   private Meta.Signature signature;
-  private Yielder<Object[]> yielder;
-  private int offset;
+
+  /**
+   * The fetcher to use to read batches of rows. Holds onto the yielder for a

Review Comment:
   ```suggestion
      * The fetcher used to read batches of rows. Holds onto the yielder for a
   ```



##########
sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java:
##########
@@ -73,25 +176,46 @@ public class DruidJdbcResultSet implements Closeable
    * https://github.com/apache/druid/pull/4288
    * https://github.com/apache/druid/pull/4415
    */
-  private final ExecutorService yielderOpenCloseExecutor;
+  private final ExecutorService queryExecutor;
   private final DirectStatement stmt;
   private final long maxRowCount;
+  private final ResultFetcherFactory fetcherFactory;
   private State state = State.NEW;
   private Meta.Signature signature;
-  private Yielder<Object[]> yielder;
-  private int offset;
+
+  /**
+   * The fetcher to use to read batches of rows. Holds onto the yielder for a
+   * query. Maintains the current read offset.
+   */
+  private ResultFetcher fetcher;
+
+  /**
+   * Future for a fetch that timed out waiting, and should be use again on

Review Comment:
   ```suggestion
      * Future for a fetch that timed out waiting, and should be used again on
   ```



##########
sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java:
##########
@@ -95,7 +97,12 @@ public static <T extends Throwable> T logFailure(T error, String message, Object
    */
   public static <T extends Throwable> T logFailure(T error)
   {
-    logFailure(error, error.getMessage());
+    if (error instanceof NoSuchConnectionException) {

Review Comment:
   Any reason that we need a special case for `NoSuchConnectionException`? Maybe some comments in the code are useful for better understanding.



##########
sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java:
##########
@@ -139,7 +147,9 @@ public DruidMeta(
                 .setDaemon(true)
                 .build()
         ),
-        authMapper.getAuthenticatorChain()
+        authMapper.getAuthenticatorChain(),
+        // To prevent server hammering, the timeout must be at least 1 second.
+        new ResultFetcherFactory(Math.max(1000, config.getFetchTimeoutMs()))

Review Comment:
   No need to call `Math.max` because the ctor of  `ResultFetcherFactory` already guarantees the minimal timeout is 1 sec.



##########
sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java:
##########
@@ -55,6 +60,104 @@
  */
 public class DruidJdbcResultSet implements Closeable
 {
+  /**
+   * Asynchronous result fetcher. JDBC operates via REST, which is subject to
+   * timeouts if a query takes to long to respond. Fortunately, JDBC uses a

Review Comment:
   ```suggestion
      * timeout if a query takes too long to respond. Fortunately, JDBC uses a
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on pull request #13196: Async reads for JDBC

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on PR #13196:
URL: https://github.com/apache/druid/pull/13196#issuecomment-1272467420

   > Just to be clear, this polling is a feature of the Avatica client. 
   
   I get it, it's a mechanism provided by the client. No wonder I didn't see such behaviour in other HTTP-based JDBC clients.
   This is a good feature that ease client users to set proper timeout settings. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org