You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2024/02/21 00:01:31 UTC

(arrow) branch main updated: GH-38573: [Java][FlightRPC] Try all locations in JDBC driver (#40104)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new aa6b398592 GH-38573: [Java][FlightRPC] Try all locations in JDBC driver (#40104)
aa6b398592 is described below

commit aa6b39859261ab2f116d4d971127c53a8a5be2a5
Author: David Li <li...@gmail.com>
AuthorDate: Tue Feb 20 19:01:25 2024 -0500

    GH-38573: [Java][FlightRPC] Try all locations in JDBC driver (#40104)
    
    
    
    ### Rationale for this change
    
    This brings the JDBC driver up to par with other Flight SQL clients.
    
    ### What changes are included in this PR?
    
    Try multiple locations for the Flight SQL driver.
    
    ### Are these changes tested?
    
    Yes
    
    ### Are there any user-facing changes?
    
    No
    
    * Closes: #38573
    
    Authored-by: David Li <li...@gmail.com>
    Signed-off-by: David Li <li...@gmail.com>
---
 .../java/org/apache/arrow/flight/FlightClient.java |  3 +-
 .../jdbc/client/ArrowFlightSqlClientHandler.java   | 53 +++++++++----
 .../driver/jdbc/utils/FlightEndpointDataQueue.java |  9 ++-
 .../apache/arrow/driver/jdbc/ResultSetTest.java    | 91 +++++++++++++++++++++-
 4 files changed, 137 insertions(+), 19 deletions(-)

diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
index 980a762e39..49f9af4ebf 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
@@ -909,7 +909,8 @@ public class FlightClient implements AutoCloseable {
 
       builder
           .maxTraceEvents(MAX_CHANNEL_TRACE_EVENTS)
-          .maxInboundMessageSize(maxInboundMessageSize);
+          .maxInboundMessageSize(maxInboundMessageSize)
+          .maxInboundMetadataSize(maxInboundMessageSize);
       return new FlightClient(allocator, builder.build(), middleware);
     }
   }
diff --git a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
index 234820bd41..1b03f927d7 100644
--- a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
+++ b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
@@ -116,26 +116,47 @@ public final class ArrowFlightSqlClientHandler implements AutoCloseable {
               sqlClient.getStream(endpoint.getTicket(), getOptions()), null));
         } else {
           // Clone the builder and then set the new endpoint on it.
-          // GH-38573: This code currently only tries the first Location and treats a failure as fatal.
-          // This should be changed to try other Locations that are available.
-          
+
           // GH-38574: Currently a new FlightClient will be made for each partition that returns a non-empty Location
           // then disposed of. It may be better to cache clients because a server may report the same Locations.
           // It would also be good to identify when the reported location is the same as the original connection's
           // Location and skip creating a FlightClient in that scenario.
-          final URI endpointUri = endpoint.getLocations().get(0).getUri();
-          final Builder builderForEndpoint = new Builder(ArrowFlightSqlClientHandler.this.builder)
-              .withHost(endpointUri.getHost())
-              .withPort(endpointUri.getPort())
-              .withEncryption(endpointUri.getScheme().equals(LocationSchemes.GRPC_TLS));
-
-          final ArrowFlightSqlClientHandler endpointHandler = builderForEndpoint.build();
-          try {
-            endpoints.add(new CloseableEndpointStreamPair(
-                endpointHandler.sqlClient.getStream(endpoint.getTicket(),
-                    endpointHandler.getOptions()), endpointHandler.sqlClient));
-          } catch (Exception ex) {
-            AutoCloseables.close(endpointHandler);
+          List<Exception> exceptions = new ArrayList<>();
+          CloseableEndpointStreamPair stream = null;
+          for (Location location : endpoint.getLocations()) {
+            final URI endpointUri = location.getUri();
+            final Builder builderForEndpoint = new Builder(ArrowFlightSqlClientHandler.this.builder)
+                    .withHost(endpointUri.getHost())
+                    .withPort(endpointUri.getPort())
+                    .withEncryption(endpointUri.getScheme().equals(LocationSchemes.GRPC_TLS));
+
+            ArrowFlightSqlClientHandler endpointHandler = null;
+            try {
+              endpointHandler = builderForEndpoint.build();
+              stream = new CloseableEndpointStreamPair(
+                      endpointHandler.sqlClient.getStream(endpoint.getTicket(),
+                              endpointHandler.getOptions()), endpointHandler.sqlClient);
+              // Make sure we actually get data from the server
+              stream.getStream().getSchema();
+            } catch (Exception ex) {
+              if (endpointHandler != null) {
+                AutoCloseables.close(endpointHandler);
+              }
+              exceptions.add(ex);
+              continue;
+            }
+            break;
+          }
+          if (stream != null) {
+            endpoints.add(stream);
+          } else if (exceptions.isEmpty()) {
+            // This should never happen...
+            throw new IllegalStateException("Could not connect to endpoint and no errors occurred");
+          } else {
+            Exception ex = exceptions.remove(0);
+            while (!exceptions.isEmpty()) {
+              ex.addSuppressed(exceptions.remove(exceptions.size() - 1));
+            }
             throw ex;
           }
         }
diff --git a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/FlightEndpointDataQueue.java b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/FlightEndpointDataQueue.java
index 1198d89c40..d617026c68 100644
--- a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/FlightEndpointDataQueue.java
+++ b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/FlightEndpointDataQueue.java
@@ -108,7 +108,14 @@ public class FlightEndpointDataQueue implements AutoCloseable {
         if (endpoint != null) {
           return endpoint;
         }
-      } catch (final ExecutionException | InterruptedException | CancellationException e) {
+      } catch (final ExecutionException e) {
+        // Unwrap one layer
+        final Throwable cause = e.getCause();
+        if (cause instanceof FlightRuntimeException) {
+          throw (FlightRuntimeException) cause;
+        }
+        throw AvaticaConnection.HELPER.wrap(e.getMessage(), e);
+      } catch (InterruptedException | CancellationException e) {
         throw AvaticaConnection.HELPER.wrap(e.getMessage(), e);
       }
     }
diff --git a/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ResultSetTest.java b/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ResultSetTest.java
index 0e3e015a04..680803318e 100644
--- a/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ResultSetTest.java
+++ b/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ResultSetTest.java
@@ -39,6 +39,7 @@ import java.sql.SQLTimeoutException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
@@ -49,7 +50,10 @@ import org.apache.arrow.driver.jdbc.utils.CoreMockedSqlProducers;
 import org.apache.arrow.driver.jdbc.utils.PartitionedFlightSqlProducer;
 import org.apache.arrow.flight.FlightEndpoint;
 import org.apache.arrow.flight.FlightProducer;
+import org.apache.arrow.flight.FlightRuntimeException;
 import org.apache.arrow.flight.FlightServer;
+import org.apache.arrow.flight.FlightStatusCode;
+import org.apache.arrow.flight.Location;
 import org.apache.arrow.flight.Ticket;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
@@ -63,6 +67,7 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 import org.junit.rules.ErrorCollector;
 
 import com.google.common.collect.ImmutableSet;
@@ -351,7 +356,7 @@ public class ResultSetTest {
               .toString(),
           anyOf(is(format("Error while executing SQL \"%s\": Query canceled", query)),
               allOf(containsString(format("Error while executing SQL \"%s\"", query)),
-                  containsString("CANCELLED"))));
+                  anyOf(containsString("CANCELLED"), containsString("Cancelling")))));
     }
   }
 
@@ -455,6 +460,90 @@ public class ResultSetTest {
     }
   }
 
+  @Test
+  public void testPartitionedFlightServerIgnoreFailure() throws Exception {
+    final Schema schema = new Schema(
+            Collections.singletonList(Field.nullablePrimitive("int_column", new ArrowType.Int(32, true))));
+    try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+      final FlightEndpoint firstEndpoint =
+              new FlightEndpoint(new Ticket("first".getBytes(StandardCharsets.UTF_8)),
+                      Location.forGrpcInsecure("127.0.0.2", 1234),
+                      Location.forGrpcInsecure("127.0.0.3", 1234));
+
+      try (final PartitionedFlightSqlProducer rootProducer = new PartitionedFlightSqlProducer(
+              schema, firstEndpoint);
+           FlightServer rootServer = FlightServer.builder(
+                           allocator, forGrpcInsecure("localhost", 0), rootProducer)
+                   .build()
+                   .start();
+           Connection newConnection = DriverManager.getConnection(String.format(
+                   "jdbc:arrow-flight-sql://%s:%d/?useEncryption=false",
+                   rootServer.getLocation().getUri().getHost(), rootServer.getPort()));
+           Statement newStatement = newConnection.createStatement()) {
+        final SQLException e = Assertions.assertThrows(SQLException.class, () -> {
+          ResultSet result = newStatement.executeQuery("Select partitioned_data");
+          while (result.next()) {
+          }
+        });
+        final Throwable cause = e.getCause();
+        Assertions.assertTrue(cause instanceof FlightRuntimeException);
+        final FlightRuntimeException fre = (FlightRuntimeException) cause;
+        Assertions.assertEquals(FlightStatusCode.UNAVAILABLE, fre.status().code());
+      }
+    }
+  }
+
+  @Test
+  public void testPartitionedFlightServerAllFailure() throws Exception {
+    // Arrange
+    final Schema schema = new Schema(
+            Collections.singletonList(Field.nullablePrimitive("int_column", new ArrowType.Int(32, true))));
+    try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+         VectorSchemaRoot firstPartition = VectorSchemaRoot.create(schema, allocator)) {
+      firstPartition.setRowCount(1);
+      ((IntVector) firstPartition.getVector(0)).set(0, 1);
+
+      // Construct the data-only nodes first.
+      FlightProducer firstProducer = new PartitionedFlightSqlProducer.DataOnlyFlightSqlProducer(
+              new Ticket("first".getBytes(StandardCharsets.UTF_8)), firstPartition);
+
+      final FlightServer.Builder firstBuilder = FlightServer.builder(
+              allocator, forGrpcInsecure("localhost", 0), firstProducer);
+
+      // Run the data-only nodes so that we can get the Locations they are running at.
+      try (FlightServer firstServer = firstBuilder.build()) {
+        firstServer.start();
+        final Location badLocation = Location.forGrpcInsecure("127.0.0.2", 1234);
+        final FlightEndpoint firstEndpoint =
+                new FlightEndpoint(new Ticket("first".getBytes(StandardCharsets.UTF_8)),
+                        badLocation, firstServer.getLocation());
+
+        // Finally start the root node.
+        try (final PartitionedFlightSqlProducer rootProducer = new PartitionedFlightSqlProducer(
+                schema, firstEndpoint);
+             FlightServer rootServer = FlightServer.builder(
+                             allocator, forGrpcInsecure("localhost", 0), rootProducer)
+                     .build()
+                     .start();
+             Connection newConnection = DriverManager.getConnection(String.format(
+                     "jdbc:arrow-flight-sql://%s:%d/?useEncryption=false",
+                     rootServer.getLocation().getUri().getHost(), rootServer.getPort()));
+             Statement newStatement = newConnection.createStatement();
+             // Act
+             ResultSet result = newStatement.executeQuery("Select partitioned_data")) {
+          List<Integer> resultData = new ArrayList<>();
+          while (result.next()) {
+            resultData.add(result.getInt(1));
+          }
+
+          // Assert
+          assertEquals(firstPartition.getRowCount(), resultData.size());
+          assertTrue(resultData.contains(((IntVector) firstPartition.getVector(0)).get(0)));
+        }
+      }
+    }
+  }
+
   @Test
   public void testShouldRunSelectQueryWithEmptyVectorsEmbedded() throws Exception {
     try (Statement statement = connection.createStatement();