You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/10/11 23:01:18 UTC

[GitHub] [druid] paul-rogers commented on a diff in pull request #13196: Async reads for JDBC

paul-rogers commented on code in PR #13196:
URL: https://github.com/apache/druid/pull/13196#discussion_r992832687


##########
sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java:
##########
@@ -139,34 +260,52 @@ public synchronized Meta.Signature getSignature()
   public synchronized Meta.Frame nextFrame(final long fetchOffset, final int fetchMaxRowCount)
   {
     ensure(State.RUNNING, State.DONE);
-    Preconditions.checkState(fetchOffset == offset, "fetchOffset [%,d] != offset [%,d]", fetchOffset, offset);
+    Preconditions.checkState(fetchOffset == nextFetchOffset, "fetchOffset [%,d] != offset [%,d]", fetchOffset, nextFetchOffset);
     if (state == State.DONE) {
-      return new Meta.Frame(fetchOffset, true, Collections.emptyList());
+      return new Meta.Frame(fetcher.offset(), true, Collections.emptyList());
     }
 
+    final Future<Meta.Frame> future;
+    if (fetchFuture == null) {
+      // Not waiting on a batch. Request one now.
+      fetcher.setBatchSize(fetchMaxRowCount);
+      future = queryExecutor.submit(fetcher);
+    } else {
+      // Last batch took too long. Continue waiting for it.
+      future = fetchFuture;
+      fetchFuture = null;
+    }
     try {
-      final List<Object> rows = new ArrayList<>();
-      while (!yielder.isDone() && (fetchMaxRowCount < 0 || offset < fetchOffset + fetchMaxRowCount)) {
-        rows.add(yielder.get());
-        yielder = yielder.next(null);
-        offset++;
-      }
-
-      if (yielder.isDone()) {
+      Meta.Frame result = future.get(fetcherFactory.fetchTimeoutMs(), TimeUnit.MILLISECONDS);

Review Comment:
   Great question! There is a tradeoff. Keeping the old code would be best done by having two variations of this class: the original one and this revised one. That would seem to increase the error surface by having more opportunities for bugs to creep in.
   
   It would seem that the new functionality is binary: it works or it doesn't. Users are obligated to use the Avatica client, so if we test that the zero-batch trick works for that client, we should be good.
   
   So, my thought is we just have the one implementation, and let tests tell is if anything broke. Let's revisit this decision if the tests tell us something is flaky and our assumptions are not valid. 



##########
sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java:
##########
@@ -139,34 +260,52 @@ public synchronized Meta.Signature getSignature()
   public synchronized Meta.Frame nextFrame(final long fetchOffset, final int fetchMaxRowCount)
   {
     ensure(State.RUNNING, State.DONE);
-    Preconditions.checkState(fetchOffset == offset, "fetchOffset [%,d] != offset [%,d]", fetchOffset, offset);
+    Preconditions.checkState(fetchOffset == nextFetchOffset, "fetchOffset [%,d] != offset [%,d]", fetchOffset, nextFetchOffset);
     if (state == State.DONE) {
-      return new Meta.Frame(fetchOffset, true, Collections.emptyList());
+      return new Meta.Frame(fetcher.offset(), true, Collections.emptyList());
     }
 
+    final Future<Meta.Frame> future;
+    if (fetchFuture == null) {
+      // Not waiting on a batch. Request one now.
+      fetcher.setBatchSize(fetchMaxRowCount);
+      future = queryExecutor.submit(fetcher);
+    } else {
+      // Last batch took too long. Continue waiting for it.
+      future = fetchFuture;
+      fetchFuture = null;
+    }
     try {
-      final List<Object> rows = new ArrayList<>();
-      while (!yielder.isDone() && (fetchMaxRowCount < 0 || offset < fetchOffset + fetchMaxRowCount)) {
-        rows.add(yielder.get());
-        yielder = yielder.next(null);
-        offset++;
-      }
-
-      if (yielder.isDone()) {
+      Meta.Frame result = future.get(fetcherFactory.fetchTimeoutMs(), TimeUnit.MILLISECONDS);
+      if (result.done) {
         state = State.DONE;
       }
-
-      return new Meta.Frame(fetchOffset, state == State.DONE, rows);
+      nextFetchOffset = fetcher.offset;
+      return result;
     }
-    catch (Throwable t) {
-      throw closeAndPropagateThrowable(t);
+    catch (CancellationException | InterruptedException e) {
+      // Consider this a failure.
+      throw closeAndPropagateThrowable(e);
+    }
+    catch (ExecutionException e) {
+      // Fetch threw an error. Unwrap it.
+      throw closeAndPropagateThrowable(e.getCause());
+    }
+    catch (TimeoutException e) {
+      fetchFuture = future;
+      // Wait timed out. Return 0 rows: the client will try again later.
+      // We'll wait again on this same fetch next time.
+      // Note that when the next fetch request comes, it will use the batch
+      // size set here: any change in size will be ignored for the in-flight batch.
+      // Changing batch size mid-query is an odd case: it will probably never happen.
+      return new Meta.Frame(nextFetchOffset, false, Collections.emptyList());

Review Comment:
   There is a new test that verifies that this is true. It sleeps for 5 seconds before fetching the first row. The fetch timeout is 1 second. And, sure enough, if we add some logging (then strip out the noise):
   
   ```text
   Timeout of batch at offset 0
   Timeout of batch at offset 0 
   Fetched batch at offset 0
   ```
   
   The first two log lines indicate that we returned a zero-row batch to the Avatica client which turned around and asked us again. Again we returned no rows, so the client asked a third time,  this time with feeling, "give me some rows!", and we obliged with the 6 rows from the test table. (Sadly, we don't have access to the batch row count, so it isn't logged above.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org