You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "ArgusLi (via GitHub)" <gi...@apache.org> on 2023/05/16 01:12:55 UTC

[GitHub] [arrow] ArgusLi opened a new pull request, #35603: GH-35559: [Java] Implementing JDBC Flight Stream Result Set asynchronous VectorSchemaRoot Producer

ArgusLi opened a new pull request, #35603:
URL: https://github.com/apache/arrow/pull/35603

   <!--
   Thanks for opening a pull request!
   If this is your first pull request you can find detailed information on how 
   to contribute here:
     * [New Contributor's Guide](https://arrow.apache.org/docs/dev/developers/guide/step_by_step/pr_lifecycle.html#reviews-and-merge-of-the-pull-request)
     * [Contributing Overview](https://arrow.apache.org/docs/dev/developers/overview.html)
   
   
   If this is not a [minor PR](https://github.com/apache/arrow/blob/main/CONTRIBUTING.md#Minor-Fixes). Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose
   
   Opening GitHub issues ahead of time contributes to the [Openness](http://theapacheway.com/open/#:~:text=Openness%20allows%20new%20users%20the,must%20happen%20in%20the%20open.) of the Apache Arrow project.
   
   Then could you also rename the pull request title in the following format?
   
       GH-${GITHUB_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}
   
   or
   
       MINOR: [${COMPONENT}] ${SUMMARY}
   
   In the case of PARQUET issues on JIRA the title also supports:
   
       PARQUET-${JIRA_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}
   
   -->
   
   ### Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   The change leads to a performance increase when there is more than 2 million rows of data:
   ![image](https://github.com/apache/arrow/assets/43020525/ecc11292-9553-48c3-8e07-651527740481)
   These results were taken from an automated test that we've discovered may not be the best indicator of performance.
   
   A more recent manual test fetching 500k rows has shown that this change led to a 4% faster performance - 114,256 ms vs 118,706 ms.
   
   The performance enhancement is iterative, we wanted to create this PR early to engage the community and get feedback as early as possible. More results are pending as we are working with a customer to see results in their lab. Our customer has a use case that leads to a fast consumer, they move millions of rows over the network back and forth, so there was a hypothesis that implementing double buffering would lead to a performance increase.
   
   ### What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   - Implement a double buffer blocking queue for VectorSchemaRoots.
   - Produce VectorSchemaRoots Asynchronously.
   - Add reading "buffersize" driver property to initalise blocking queue with a custom value.
   - Use VectorLoader and VectorUnloader.
   
   ### Are these changes tested?
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   Yes. There is no change in functionality, so the existing tests are adequate.
   
   ### Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   No.
   
   <!--
   If there are any breaking changes to public APIs, please uncomment the line below and explain which changes are breaking.
   -->
   <!-- **This PR includes breaking changes to public APIs.** -->
   
   <!--
   Please uncomment the line below (and provide explanation) if the changes fix either (a) a security vulnerability, (b) a bug that caused incorrect or invalid data to be produced, or (c) a bug that causes a crash (even when the API contract is upheld). We use this to highlight fixes to issues that may affect users without their knowledge. For this reason, fixing bugs that cause errors don't count, since those are usually obvious.
   -->
   <!-- **This PR contains a "Critical Fix".** -->
   * Closes: #35559


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #35603: GH-35559: [Java] Implementing JDBC Flight Stream Result Set asynchronous VectorSchemaRoot Producer

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #35603:
URL: https://github.com/apache/arrow/pull/35603#discussion_r1283201834


##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java:
##########
@@ -99,23 +136,33 @@ static ArrowFlightJdbcFlightStreamResultSet fromFlightInfo(
     final ArrowFlightJdbcFlightStreamResultSet resultSet =
         new ArrowFlightJdbcFlightStreamResultSet(connection, state, signature, resultSetMetaData,
             timeZone, null);
-
     resultSet.transformer = transformer;
 
     resultSet.execute(flightInfo);
     return resultSet;
   }
 
-  private void loadNewQueue() {
-    Optional.ofNullable(flightStreamQueue).ifPresent(AutoCloseables::closeNoChecked);
-    flightStreamQueue = createNewQueue(connection.getExecutorService());
+  /**
+   * Gets the Blocking Queue Capacity from {@link ArrowFlightConnection} properties.
+   *
+   * @return Blocking Queue Capacity set or Default Blocking Queue Capacity
+   * @throws SQLException for Invalid Blocking Queue Capacity set
+   */
+  private int getBlockingQueueCapacity() throws SQLException {
+    try {
+      return ofNullable(connection.getClientInfo().getProperty(BLOCKING_QUEUE_PARAM))
+              .map(Integer::parseInt)
+              .filter(s -> s > 0)
+              .orElse(DEFAULT_BLOCKING_QUEUE_CAPACITY);
+    } catch (java.lang.NumberFormatException e) {
+      throw new SQLException("Invalid value for 'buffersize' was provided", e);
+    }
   }
 
-  private void loadNewFlightStream() throws SQLException {
-    if (currentFlightStream != null) {
-      AutoCloseables.closeNoChecked(currentFlightStream);
+  private void initializeVectorSchemaRootsQueue(int blockingQueueCapacity) {
+    if (vectorSchemaRoots == null) {
+      vectorSchemaRoots = new LinkedBlockingQueue<>(blockingQueueCapacity);
     }
-    this.currentFlightStream = getNextFlightStream(true);
   }

Review Comment:
   Does this really need to be a separate method? It can just be inlined into the constructor. (Also, won't vectorSchemaRoots always be null to start with? If we initialize it in the constructor, we can make it final too.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on pull request #35603: GH-35559: [Java] Implementing JDBC Flight Stream Result Set asynchronous VectorSchemaRoot Producer

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on PR #35603:
URL: https://github.com/apache/arrow/pull/35603#issuecomment-1549539028

   > So gRPC should already be buffering data
   
   The flip side is that due to a design decision in gRPC-Java, it's actually hard for a Java _server_ to properly saturate the connection, because gRPC uses a fixed-size buffer for its backpressure indicator. That may also be something to explore, if not already explored. (See https://github.com/grpc/proposal/pull/135)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on pull request #35603: GH-35559: [Java] Implementing JDBC Flight Stream Result Set asynchronous VectorSchemaRoot Producer

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on PR #35603:
URL: https://github.com/apache/arrow/pull/35603#issuecomment-1549534625

   So gRPC should already be buffering data, meaning that this change is mostly about pipelining the IPC work - except IPC deserialization should be very cheap. That could explain why the benefits are small.
   
   4% is something, but the effects still seem very small.
   
   If performance with millions of rows is a concern, has there been any profiling or tracing to identify the bottlenecks? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] ArgusLi commented on pull request #35603: GH-35559: [Java] Implementing JDBC Flight Stream Result Set asynchronous VectorSchemaRoot Producer

Posted by "ArgusLi (via GitHub)" <gi...@apache.org>.
ArgusLi commented on PR #35603:
URL: https://github.com/apache/arrow/pull/35603#issuecomment-1550437343

   Thanks David, you've given us quite a lot to think about. We'll investigate the configurable back pressure threshold and look at the customer's use case a bit more. Thanks once again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] ArgusLi commented on pull request #35603: GH-35559: [Java] Implementing JDBC Flight Stream Result Set asynchronous VectorSchemaRoot Producer

Posted by "ArgusLi (via GitHub)" <gi...@apache.org>.
ArgusLi commented on PR #35603:
URL: https://github.com/apache/arrow/pull/35603#issuecomment-1624350856

   @lidavidm the customer has come back with data that shows this change leads to a decrease of 5-40% in time taken. Unfortunately, they showed the data in a screen share during a meeting, and due to client confidentiality, we are not at liberty to share this data. Given that we have data that shows there is a performance enhancement due to this change, we’d like to push this PR forward.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on pull request #35603: GH-35559: [Java] Implementing JDBC Flight Stream Result Set asynchronous VectorSchemaRoot Producer

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on PR #35603:
URL: https://github.com/apache/arrow/pull/35603#issuecomment-1629015702

   Additionally, it seems CI is failing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #35603: GH-35559: [Java] Implementing JDBC Flight Stream Result Set asynchronous VectorSchemaRoot Producer

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #35603:
URL: https://github.com/apache/arrow/pull/35603#issuecomment-1548826190

   * Closes: #35559


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] ArgusLi commented on pull request #35603: GH-35559: [Java] Implementing JDBC Flight Stream Result Set asynchronous VectorSchemaRoot Producer

Posted by "ArgusLi (via GitHub)" <gi...@apache.org>.
ArgusLi commented on PR #35603:
URL: https://github.com/apache/arrow/pull/35603#issuecomment-1549921633

   @lidavidm That is quite interesting... thanks for the link!
   
   We did do some profiling - multiple thread dumps while the JDBC client was fetching data and in all the snapshots it was waiting for next().
   
   ```
   DBeaver: Read data [SELECT * FROM "mongo.yelp"."review"]awaiting notification on [ 0x00000007cb2c85a0 ]
   at jdk.internal.misc.Unsafe.park(java.base@17/Native Method)
   at java.util.concurrent.locks.LockSupport.park(java.base@17/Unknown Source)
   at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block(java.base@17/Unknown Source)
   at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@17/Unknown Source)
   at java.util.concurrent.ForkJoinPool.managedBlock(java.base@17/Unknown Source)
   at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@17/Unknown Source)
   at java.util.concurrent.LinkedBlockingQueue.take(java.base@17/Unknown Source)
   at cfjd.org.apache.arrow.flight.FlightStream.next(FlightStream.java:231)
   at org.apache.arrow.driver.jdbc.ArrowFlightJdbcFlightStreamResultSet.next(ArrowFlightJdbcFlightStreamResultSet.java:184)
   at org.jkiss.dbeaver.model.impl.jdbc.exec.JDBCResultSetImpl.next(JDBCResultSetImpl.java:272)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on pull request #35603: GH-35559: [Java] Implementing JDBC Flight Stream Result Set asynchronous VectorSchemaRoot Producer

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on PR #35603:
URL: https://github.com/apache/arrow/pull/35603#issuecomment-1625465040

   Hmm, I'm still a little surprised, since it's unclear why this is faster, for the reasons I already gave. But I'll give it a review when I find time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on pull request #35603: GH-35559: [Java] Implementing JDBC Flight Stream Result Set asynchronous VectorSchemaRoot Producer

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on PR #35603:
URL: https://github.com/apache/arrow/pull/35603#issuecomment-1663990023

   That said, it seems things are still failing. (Possibly close() needs to wait for any background tasks to complete/shut down the thread pool?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #35603: GH-35559: [Java] Implementing JDBC Flight Stream Result Set asynchronous VectorSchemaRoot Producer

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #35603:
URL: https://github.com/apache/arrow/pull/35603#issuecomment-1548833264

   :warning: GitHub issue #35559 **has no components**, please add labels for components.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #35603: GH-35559: [Java] Implementing JDBC Flight Stream Result Set asynchronous VectorSchemaRoot Producer

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #35603:
URL: https://github.com/apache/arrow/pull/35603#discussion_r1258281240


##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java:
##########
@@ -110,6 +110,7 @@ private static ArrowFlightSqlClientHandler createNewClientHandler(
     } catch (final SQLException e) {
       try {
         allocator.close();
+        allocator.getChildAllocators().forEach(BufferAllocator::close);

Review Comment:
   1) Can we use AutoCloseables?
   2) This needs to come before the root allocator is closed anyways



##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java:
##########
@@ -204,6 +259,21 @@ public boolean next() throws SQLException {
     }
   }
 
+  private void cleanUpResources() throws Exception {
+    if (flightStreamQueue != null) {
+      // flightStreamQueue should close currentFlightStream internally
+      flightStreamQueue.close();
+    } else if (currentFlightStream != null) {
+      // close is only called for currentFlightStream if there's no queue
+      currentFlightStream.close();
+    }
+
+    List<VectorSchemaRoot> roots = new LinkedList<>();
+    vectorSchemaRoots.drainTo(roots);
+    roots.forEach(AutoCloseables::closeNoChecked);
+    ofNullable(currentRoot).ifPresent(AutoCloseables::closeNoChecked);

Review Comment:
   It would be preferable to do this all as one big AutoCloseables call if possible (also, is LinkedList strictly necessary here vs ArrayList?)



##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcVectorSchemaRootResultSet.java:
##########
@@ -92,18 +93,11 @@ protected AvaticaResultSet execute() throws SQLException {
     throw new RuntimeException("Can only execute with execute(VectorSchemaRoot)");
   }
 
-  void execute(final VectorSchemaRoot vectorSchemaRoot) {
-    final List<Field> fields = vectorSchemaRoot.getSchema().getFields();
-    final List<ColumnMetaData> columns = ConvertUtils.convertArrowFieldsToColumnMetaDataList(fields);
-    signature.columns.clear();
-    signature.columns.addAll(columns);
-
-    this.vectorSchemaRoot = vectorSchemaRoot;
-    execute2(new ArrowFlightJdbcCursor(vectorSchemaRoot), this.signature.columns);
-  }
-
   void execute(final VectorSchemaRoot vectorSchemaRoot, final Schema schema) {
-    final List<ColumnMetaData> columns = ConvertUtils.convertArrowFieldsToColumnMetaDataList(schema.getFields());
+    final List<Field> fields = Optional.ofNullable(schema)

Review Comment:
   nit: can't this just be a ternary? There's no reason to box/unbox an Optional here



##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java:
##########
@@ -45,15 +54,18 @@
  */
 public final class ArrowFlightJdbcFlightStreamResultSet
     extends ArrowFlightJdbcVectorSchemaRootResultSet {
-
+  private static final String BLOCKING_QUEUE_PARAM = "buffersize";

Review Comment:
   Can we add this to the documentation?
   
   Can we add a test of setting this parameter?



##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java:
##########
@@ -130,39 +146,89 @@ protected AvaticaResultSet execute() throws SQLException {
   }
 
   private void execute(final FlightInfo flightInfo) throws SQLException {
-    loadNewQueue();
+    // load new FlightStreamQueue
+    ofNullable(flightStreamQueue).ifPresent(AutoCloseables::closeNoChecked);
+    flightStreamQueue = createNewQueue(connection.getExecutorService());
+
+    // load new FlightStream
     flightStreamQueue.enqueue(connection.getClientHandler().getStreams(flightInfo));
-    loadNewFlightStream();
+    ofNullable(currentFlightStream).ifPresent(AutoCloseables::closeNoChecked);
+    currentFlightStream = getNextFlightStream(true);
 
     // Ownership of the root will be passed onto the cursor.
     if (currentFlightStream != null) {
-      executeForCurrentFlightStream();
+      storeRootsFromStreamAsync();
+      executeNextRoot();
+    }
+  }
+
+  private BufferAllocator getAllocator() {
+    if (allocator == null) {
+      allocator = connection.getBufferAllocator().newChildAllocator("vsr-copier", 0, Long.MAX_VALUE);
     }
+
+    return allocator;
   }
 
-  private void executeForCurrentFlightStream() throws SQLException {
-    final VectorSchemaRoot originalRoot = currentFlightStream.getRoot();
+  private VectorSchemaRoot cloneRoot(VectorSchemaRoot originalRoot) {
+    VectorSchemaRoot theRoot = VectorSchemaRoot.create(originalRoot.getSchema(), getAllocator());
+    VectorLoader loader = new VectorLoader(theRoot);
+    VectorUnloader unloader = new VectorUnloader(originalRoot);
+    try (ArrowRecordBatch recordBatch = unloader.getRecordBatch()) {
+      loader.load(recordBatch);
+    }
+    return theRoot;
+  }
 
+  private void storeRoot(VectorSchemaRoot originalRoot) throws SQLException {
+    VectorSchemaRoot theRoot = cloneRoot(originalRoot);
+    VectorSchemaRoot transformedRoot = null;
     if (transformer != null) {
       try {
-        currentVectorSchemaRoot = transformer.transform(originalRoot, currentVectorSchemaRoot);
+        transformedRoot = transformer.transform(theRoot, null);
+        theRoot.close();
       } catch (final Exception e) {
         throw new SQLException("Failed to transform VectorSchemaRoot.", e);
       }
-    } else {
-      currentVectorSchemaRoot = originalRoot;
     }
 
-    if (schema != null) {
-      execute(currentVectorSchemaRoot, schema);
-    } else {
-      execute(currentVectorSchemaRoot);
+    try {
+      vectorSchemaRoots.put(ofNullable(transformedRoot).orElse(theRoot));
+    } catch (InterruptedException e) {
+      throw new SQLException("Could not put root to the queue", e);
     }
   }
 
+  private void executeNextRoot() throws SQLException {
+    try {
+      ofNullable(currentRoot).ifPresent(AutoCloseables::closeNoChecked);
+      currentRoot = vectorSchemaRoots.poll(10, TimeUnit.SECONDS);
+      execute(currentRoot, schema);
+    } catch (InterruptedException e) {
+      throw new SQLException("Could not take root from the queue", e);
+    }
+  }
+
+  private void storeRootsFromStreamAsync() {
+    CompletableFuture.runAsync(() -> {
+      while (vectorSchemaRoots.remainingCapacity() > 0) {

Review Comment:
   multiple calls to `next` might spawn this task multiple times, and then they may push too many items onto the queue, right?



##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java:
##########
@@ -130,39 +146,89 @@ protected AvaticaResultSet execute() throws SQLException {
   }
 
   private void execute(final FlightInfo flightInfo) throws SQLException {
-    loadNewQueue();
+    // load new FlightStreamQueue
+    ofNullable(flightStreamQueue).ifPresent(AutoCloseables::closeNoChecked);
+    flightStreamQueue = createNewQueue(connection.getExecutorService());
+
+    // load new FlightStream
     flightStreamQueue.enqueue(connection.getClientHandler().getStreams(flightInfo));
-    loadNewFlightStream();
+    ofNullable(currentFlightStream).ifPresent(AutoCloseables::closeNoChecked);
+    currentFlightStream = getNextFlightStream(true);
 
     // Ownership of the root will be passed onto the cursor.
     if (currentFlightStream != null) {
-      executeForCurrentFlightStream();
+      storeRootsFromStreamAsync();
+      executeNextRoot();
+    }
+  }
+
+  private BufferAllocator getAllocator() {
+    if (allocator == null) {
+      allocator = connection.getBufferAllocator().newChildAllocator("vsr-copier", 0, Long.MAX_VALUE);
     }
+
+    return allocator;
   }
 
-  private void executeForCurrentFlightStream() throws SQLException {
-    final VectorSchemaRoot originalRoot = currentFlightStream.getRoot();
+  private VectorSchemaRoot cloneRoot(VectorSchemaRoot originalRoot) {
+    VectorSchemaRoot theRoot = VectorSchemaRoot.create(originalRoot.getSchema(), getAllocator());
+    VectorLoader loader = new VectorLoader(theRoot);
+    VectorUnloader unloader = new VectorUnloader(originalRoot);
+    try (ArrowRecordBatch recordBatch = unloader.getRecordBatch()) {
+      loader.load(recordBatch);
+    }
+    return theRoot;
+  }
 
+  private void storeRoot(VectorSchemaRoot originalRoot) throws SQLException {
+    VectorSchemaRoot theRoot = cloneRoot(originalRoot);
+    VectorSchemaRoot transformedRoot = null;
     if (transformer != null) {
       try {
-        currentVectorSchemaRoot = transformer.transform(originalRoot, currentVectorSchemaRoot);
+        transformedRoot = transformer.transform(theRoot, null);
+        theRoot.close();
       } catch (final Exception e) {
         throw new SQLException("Failed to transform VectorSchemaRoot.", e);
       }
-    } else {
-      currentVectorSchemaRoot = originalRoot;
     }
 
-    if (schema != null) {
-      execute(currentVectorSchemaRoot, schema);
-    } else {
-      execute(currentVectorSchemaRoot);
+    try {
+      vectorSchemaRoots.put(ofNullable(transformedRoot).orElse(theRoot));
+    } catch (InterruptedException e) {
+      throw new SQLException("Could not put root to the queue", e);
     }
   }
 
+  private void executeNextRoot() throws SQLException {
+    try {
+      ofNullable(currentRoot).ifPresent(AutoCloseables::closeNoChecked);
+      currentRoot = vectorSchemaRoots.poll(10, TimeUnit.SECONDS);

Review Comment:
   Do we want to have a hard timeout like this? We don't know how long it might take for the server to respond



##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java:
##########
@@ -130,39 +146,89 @@ protected AvaticaResultSet execute() throws SQLException {
   }
 
   private void execute(final FlightInfo flightInfo) throws SQLException {
-    loadNewQueue();
+    // load new FlightStreamQueue
+    ofNullable(flightStreamQueue).ifPresent(AutoCloseables::closeNoChecked);
+    flightStreamQueue = createNewQueue(connection.getExecutorService());
+
+    // load new FlightStream
     flightStreamQueue.enqueue(connection.getClientHandler().getStreams(flightInfo));
-    loadNewFlightStream();
+    ofNullable(currentFlightStream).ifPresent(AutoCloseables::closeNoChecked);
+    currentFlightStream = getNextFlightStream(true);
 
     // Ownership of the root will be passed onto the cursor.
     if (currentFlightStream != null) {
-      executeForCurrentFlightStream();
+      storeRootsFromStreamAsync();
+      executeNextRoot();
+    }
+  }
+
+  private BufferAllocator getAllocator() {
+    if (allocator == null) {
+      allocator = connection.getBufferAllocator().newChildAllocator("vsr-copier", 0, Long.MAX_VALUE);
     }
+
+    return allocator;

Review Comment:
   Can this also be just initialized in the constructor?



##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java:
##########
@@ -130,39 +146,89 @@ protected AvaticaResultSet execute() throws SQLException {
   }
 
   private void execute(final FlightInfo flightInfo) throws SQLException {
-    loadNewQueue();
+    // load new FlightStreamQueue
+    ofNullable(flightStreamQueue).ifPresent(AutoCloseables::closeNoChecked);
+    flightStreamQueue = createNewQueue(connection.getExecutorService());
+
+    // load new FlightStream
     flightStreamQueue.enqueue(connection.getClientHandler().getStreams(flightInfo));
-    loadNewFlightStream();
+    ofNullable(currentFlightStream).ifPresent(AutoCloseables::closeNoChecked);
+    currentFlightStream = getNextFlightStream(true);
 
     // Ownership of the root will be passed onto the cursor.
     if (currentFlightStream != null) {
-      executeForCurrentFlightStream();
+      storeRootsFromStreamAsync();
+      executeNextRoot();
+    }
+  }
+
+  private BufferAllocator getAllocator() {
+    if (allocator == null) {
+      allocator = connection.getBufferAllocator().newChildAllocator("vsr-copier", 0, Long.MAX_VALUE);
     }
+
+    return allocator;
   }
 
-  private void executeForCurrentFlightStream() throws SQLException {
-    final VectorSchemaRoot originalRoot = currentFlightStream.getRoot();
+  private VectorSchemaRoot cloneRoot(VectorSchemaRoot originalRoot) {
+    VectorSchemaRoot theRoot = VectorSchemaRoot.create(originalRoot.getSchema(), getAllocator());
+    VectorLoader loader = new VectorLoader(theRoot);
+    VectorUnloader unloader = new VectorUnloader(originalRoot);
+    try (ArrowRecordBatch recordBatch = unloader.getRecordBatch()) {
+      loader.load(recordBatch);
+    }
+    return theRoot;
+  }
 
+  private void storeRoot(VectorSchemaRoot originalRoot) throws SQLException {
+    VectorSchemaRoot theRoot = cloneRoot(originalRoot);
+    VectorSchemaRoot transformedRoot = null;
     if (transformer != null) {
       try {
-        currentVectorSchemaRoot = transformer.transform(originalRoot, currentVectorSchemaRoot);
+        transformedRoot = transformer.transform(theRoot, null);
+        theRoot.close();
       } catch (final Exception e) {
         throw new SQLException("Failed to transform VectorSchemaRoot.", e);
       }
-    } else {
-      currentVectorSchemaRoot = originalRoot;
     }
 
-    if (schema != null) {
-      execute(currentVectorSchemaRoot, schema);
-    } else {
-      execute(currentVectorSchemaRoot);
+    try {
+      vectorSchemaRoots.put(ofNullable(transformedRoot).orElse(theRoot));
+    } catch (InterruptedException e) {
+      throw new SQLException("Could not put root to the queue", e);
     }
   }
 
+  private void executeNextRoot() throws SQLException {
+    try {
+      ofNullable(currentRoot).ifPresent(AutoCloseables::closeNoChecked);
+      currentRoot = vectorSchemaRoots.poll(10, TimeUnit.SECONDS);
+      execute(currentRoot, schema);
+    } catch (InterruptedException e) {
+      throw new SQLException("Could not take root from the queue", e);
+    }
+  }
+
+  private void storeRootsFromStreamAsync() {
+    CompletableFuture.runAsync(() -> {
+      while (vectorSchemaRoots.remainingCapacity() > 0) {
+        try {
+          currentFlightStream = ofNullable(currentFlightStream).orElse(getNextFlightStream(false));
+          streamHasNext = currentFlightStream.next();
+          if (!streamHasNext) {
+            flightStreamQueue.enqueue(currentFlightStream);
+          }
+          storeRoot(currentFlightStream.getRoot());
+        } catch (SQLException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    });
+  }
+
   @Override
   public boolean next() throws SQLException {

Review Comment:
   The control flow between here and the background task are hard to follow; it would help to document what's going on 



##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java:
##########
@@ -73,6 +86,7 @@ public final class ArrowFlightJdbcFlightStreamResultSet
                                        final Meta.Frame firstFrame) throws SQLException {
     super(null, state, signature, resultSetMetaData, timeZone, firstFrame);
     this.connection = connection;
+    initializeVectorSchemaRootsQueue();
   }

Review Comment:
   Would it be possible to have both constructors delegate to a single constructor, then inline initializeVectorSchemaRootsQueue into that constructor?



##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java:
##########
@@ -130,39 +146,89 @@ protected AvaticaResultSet execute() throws SQLException {
   }
 
   private void execute(final FlightInfo flightInfo) throws SQLException {
-    loadNewQueue();
+    // load new FlightStreamQueue
+    ofNullable(flightStreamQueue).ifPresent(AutoCloseables::closeNoChecked);
+    flightStreamQueue = createNewQueue(connection.getExecutorService());
+
+    // load new FlightStream
     flightStreamQueue.enqueue(connection.getClientHandler().getStreams(flightInfo));
-    loadNewFlightStream();
+    ofNullable(currentFlightStream).ifPresent(AutoCloseables::closeNoChecked);
+    currentFlightStream = getNextFlightStream(true);
 
     // Ownership of the root will be passed onto the cursor.
     if (currentFlightStream != null) {
-      executeForCurrentFlightStream();
+      storeRootsFromStreamAsync();
+      executeNextRoot();
+    }
+  }
+
+  private BufferAllocator getAllocator() {
+    if (allocator == null) {
+      allocator = connection.getBufferAllocator().newChildAllocator("vsr-copier", 0, Long.MAX_VALUE);
     }
+
+    return allocator;
   }
 
-  private void executeForCurrentFlightStream() throws SQLException {
-    final VectorSchemaRoot originalRoot = currentFlightStream.getRoot();
+  private VectorSchemaRoot cloneRoot(VectorSchemaRoot originalRoot) {
+    VectorSchemaRoot theRoot = VectorSchemaRoot.create(originalRoot.getSchema(), getAllocator());
+    VectorLoader loader = new VectorLoader(theRoot);
+    VectorUnloader unloader = new VectorUnloader(originalRoot);
+    try (ArrowRecordBatch recordBatch = unloader.getRecordBatch()) {
+      loader.load(recordBatch);
+    }
+    return theRoot;
+  }
 
+  private void storeRoot(VectorSchemaRoot originalRoot) throws SQLException {
+    VectorSchemaRoot theRoot = cloneRoot(originalRoot);
+    VectorSchemaRoot transformedRoot = null;
     if (transformer != null) {
       try {
-        currentVectorSchemaRoot = transformer.transform(originalRoot, currentVectorSchemaRoot);
+        transformedRoot = transformer.transform(theRoot, null);
+        theRoot.close();
       } catch (final Exception e) {
         throw new SQLException("Failed to transform VectorSchemaRoot.", e);
       }
-    } else {
-      currentVectorSchemaRoot = originalRoot;
     }
 
-    if (schema != null) {
-      execute(currentVectorSchemaRoot, schema);
-    } else {
-      execute(currentVectorSchemaRoot);
+    try {
+      vectorSchemaRoots.put(ofNullable(transformedRoot).orElse(theRoot));
+    } catch (InterruptedException e) {
+      throw new SQLException("Could not put root to the queue", e);
     }
   }
 
+  private void executeNextRoot() throws SQLException {
+    try {
+      ofNullable(currentRoot).ifPresent(AutoCloseables::closeNoChecked);
+      currentRoot = vectorSchemaRoots.poll(10, TimeUnit.SECONDS);
+      execute(currentRoot, schema);
+    } catch (InterruptedException e) {
+      throw new SQLException("Could not take root from the queue", e);
+    }
+  }
+
+  private void storeRootsFromStreamAsync() {
+    CompletableFuture.runAsync(() -> {

Review Comment:
   This uses a global thread pool, can we instead use a pool we control?



##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java:
##########
@@ -130,39 +146,89 @@ protected AvaticaResultSet execute() throws SQLException {
   }
 
   private void execute(final FlightInfo flightInfo) throws SQLException {
-    loadNewQueue();
+    // load new FlightStreamQueue
+    ofNullable(flightStreamQueue).ifPresent(AutoCloseables::closeNoChecked);
+    flightStreamQueue = createNewQueue(connection.getExecutorService());
+
+    // load new FlightStream
     flightStreamQueue.enqueue(connection.getClientHandler().getStreams(flightInfo));
-    loadNewFlightStream();
+    ofNullable(currentFlightStream).ifPresent(AutoCloseables::closeNoChecked);
+    currentFlightStream = getNextFlightStream(true);
 
     // Ownership of the root will be passed onto the cursor.
     if (currentFlightStream != null) {
-      executeForCurrentFlightStream();
+      storeRootsFromStreamAsync();
+      executeNextRoot();
+    }
+  }
+
+  private BufferAllocator getAllocator() {
+    if (allocator == null) {
+      allocator = connection.getBufferAllocator().newChildAllocator("vsr-copier", 0, Long.MAX_VALUE);
     }
+
+    return allocator;
   }
 
-  private void executeForCurrentFlightStream() throws SQLException {
-    final VectorSchemaRoot originalRoot = currentFlightStream.getRoot();
+  private VectorSchemaRoot cloneRoot(VectorSchemaRoot originalRoot) {
+    VectorSchemaRoot theRoot = VectorSchemaRoot.create(originalRoot.getSchema(), getAllocator());
+    VectorLoader loader = new VectorLoader(theRoot);
+    VectorUnloader unloader = new VectorUnloader(originalRoot);
+    try (ArrowRecordBatch recordBatch = unloader.getRecordBatch()) {
+      loader.load(recordBatch);
+    }
+    return theRoot;
+  }
 
+  private void storeRoot(VectorSchemaRoot originalRoot) throws SQLException {
+    VectorSchemaRoot theRoot = cloneRoot(originalRoot);
+    VectorSchemaRoot transformedRoot = null;
     if (transformer != null) {
       try {
-        currentVectorSchemaRoot = transformer.transform(originalRoot, currentVectorSchemaRoot);
+        transformedRoot = transformer.transform(theRoot, null);
+        theRoot.close();

Review Comment:
   Can we use try-with-resources here over explicit close? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #35603: GH-35559: [Java] Implementing JDBC Flight Stream Result Set asynchronous VectorSchemaRoot Producer

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #35603:
URL: https://github.com/apache/arrow/pull/35603#issuecomment-1548826209

   :warning: GitHub issue #35559 **has no components**, please add labels for components.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on pull request #35603: GH-35559: [Java] Implementing JDBC Flight Stream Result Set asynchronous VectorSchemaRoot Producer

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on PR #35603:
URL: https://github.com/apache/arrow/pull/35603#issuecomment-1549928769

   That's not the right thread to focus on then. If most of the time is spent waiting for `take`, that means gRPC isn't populating the queue fast enough, i.e. it seems most of the time is actually waiting for data transfer. (Or possibly we're being slow at processing messages from gRPC, which is FlightStream.Observer#onNext.) And so then I'd be curious about things like, what is the bandwidth the client is getting? What is the max theoretical bandwidth, what is the observed bandwidth in a synthetic benchmark (like iperf3)?
   
   4%, without further qualification, could still be in the realm of natural variation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] ArgusLi commented on pull request #35603: GH-35559: [Java] Implementing JDBC Flight Stream Result Set asynchronous VectorSchemaRoot Producer

Posted by "ArgusLi (via GitHub)" <gi...@apache.org>.
ArgusLi commented on PR #35603:
URL: https://github.com/apache/arrow/pull/35603#issuecomment-1625733736

   Thanks @lidavidm!!!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org