You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2024/02/21 00:01:31 UTC
(arrow) branch main updated: GH-38573: [Java][FlightRPC] Try all locations in JDBC driver (#40104)
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new aa6b398592 GH-38573: [Java][FlightRPC] Try all locations in JDBC driver (#40104)
aa6b398592 is described below
commit aa6b39859261ab2f116d4d971127c53a8a5be2a5
Author: David Li <li...@gmail.com>
AuthorDate: Tue Feb 20 19:01:25 2024 -0500
GH-38573: [Java][FlightRPC] Try all locations in JDBC driver (#40104)
### Rationale for this change
This brings the JDBC driver up to par with other Flight SQL clients.
### What changes are included in this PR?
Try multiple locations for the Flight SQL driver.
### Are these changes tested?
Yes
### Are there any user-facing changes?
No
* Closes: #38573
Authored-by: David Li <li...@gmail.com>
Signed-off-by: David Li <li...@gmail.com>
---
.../java/org/apache/arrow/flight/FlightClient.java | 3 +-
.../jdbc/client/ArrowFlightSqlClientHandler.java | 53 +++++++++----
.../driver/jdbc/utils/FlightEndpointDataQueue.java | 9 ++-
.../apache/arrow/driver/jdbc/ResultSetTest.java | 91 +++++++++++++++++++++-
4 files changed, 137 insertions(+), 19 deletions(-)
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
index 980a762e39..49f9af4ebf 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
@@ -909,7 +909,8 @@ public class FlightClient implements AutoCloseable {
builder
.maxTraceEvents(MAX_CHANNEL_TRACE_EVENTS)
- .maxInboundMessageSize(maxInboundMessageSize);
+ .maxInboundMessageSize(maxInboundMessageSize)
+ .maxInboundMetadataSize(maxInboundMessageSize);
return new FlightClient(allocator, builder.build(), middleware);
}
}
diff --git a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
index 234820bd41..1b03f927d7 100644
--- a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
+++ b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
@@ -116,26 +116,47 @@ public final class ArrowFlightSqlClientHandler implements AutoCloseable {
sqlClient.getStream(endpoint.getTicket(), getOptions()), null));
} else {
// Clone the builder and then set the new endpoint on it.
- // GH-38573: This code currently only tries the first Location and treats a failure as fatal.
- // This should be changed to try other Locations that are available.
-
+
// GH-38574: Currently a new FlightClient will be made for each partition that returns a non-empty Location
// then disposed of. It may be better to cache clients because a server may report the same Locations.
// It would also be good to identify when the reported location is the same as the original connection's
// Location and skip creating a FlightClient in that scenario.
- final URI endpointUri = endpoint.getLocations().get(0).getUri();
- final Builder builderForEndpoint = new Builder(ArrowFlightSqlClientHandler.this.builder)
- .withHost(endpointUri.getHost())
- .withPort(endpointUri.getPort())
- .withEncryption(endpointUri.getScheme().equals(LocationSchemes.GRPC_TLS));
-
- final ArrowFlightSqlClientHandler endpointHandler = builderForEndpoint.build();
- try {
- endpoints.add(new CloseableEndpointStreamPair(
- endpointHandler.sqlClient.getStream(endpoint.getTicket(),
- endpointHandler.getOptions()), endpointHandler.sqlClient));
- } catch (Exception ex) {
- AutoCloseables.close(endpointHandler);
+ List<Exception> exceptions = new ArrayList<>();
+ CloseableEndpointStreamPair stream = null;
+ for (Location location : endpoint.getLocations()) {
+ final URI endpointUri = location.getUri();
+ final Builder builderForEndpoint = new Builder(ArrowFlightSqlClientHandler.this.builder)
+ .withHost(endpointUri.getHost())
+ .withPort(endpointUri.getPort())
+ .withEncryption(endpointUri.getScheme().equals(LocationSchemes.GRPC_TLS));
+
+ ArrowFlightSqlClientHandler endpointHandler = null;
+ try {
+ endpointHandler = builderForEndpoint.build();
+ stream = new CloseableEndpointStreamPair(
+ endpointHandler.sqlClient.getStream(endpoint.getTicket(),
+ endpointHandler.getOptions()), endpointHandler.sqlClient);
+ // Make sure we actually get data from the server
+ stream.getStream().getSchema();
+ } catch (Exception ex) {
+ if (endpointHandler != null) {
+ AutoCloseables.close(endpointHandler);
+ }
+ exceptions.add(ex);
+ continue;
+ }
+ break;
+ }
+ if (stream != null) {
+ endpoints.add(stream);
+ } else if (exceptions.isEmpty()) {
+ // This should never happen...
+ throw new IllegalStateException("Could not connect to endpoint and no errors occurred");
+ } else {
+ Exception ex = exceptions.remove(0);
+ while (!exceptions.isEmpty()) {
+ ex.addSuppressed(exceptions.remove(exceptions.size() - 1));
+ }
throw ex;
}
}
diff --git a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/FlightEndpointDataQueue.java b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/FlightEndpointDataQueue.java
index 1198d89c40..d617026c68 100644
--- a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/FlightEndpointDataQueue.java
+++ b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/FlightEndpointDataQueue.java
@@ -108,7 +108,14 @@ public class FlightEndpointDataQueue implements AutoCloseable {
if (endpoint != null) {
return endpoint;
}
- } catch (final ExecutionException | InterruptedException | CancellationException e) {
+ } catch (final ExecutionException e) {
+ // Unwrap one layer
+ final Throwable cause = e.getCause();
+ if (cause instanceof FlightRuntimeException) {
+ throw (FlightRuntimeException) cause;
+ }
+ throw AvaticaConnection.HELPER.wrap(e.getMessage(), e);
+ } catch (InterruptedException | CancellationException e) {
throw AvaticaConnection.HELPER.wrap(e.getMessage(), e);
}
}
diff --git a/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ResultSetTest.java b/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ResultSetTest.java
index 0e3e015a04..680803318e 100644
--- a/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ResultSetTest.java
+++ b/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ResultSetTest.java
@@ -39,6 +39,7 @@ import java.sql.SQLTimeoutException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
@@ -49,7 +50,10 @@ import org.apache.arrow.driver.jdbc.utils.CoreMockedSqlProducers;
import org.apache.arrow.driver.jdbc.utils.PartitionedFlightSqlProducer;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightProducer;
+import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.arrow.flight.FlightServer;
+import org.apache.arrow.flight.FlightStatusCode;
+import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
@@ -63,6 +67,7 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
import org.junit.rules.ErrorCollector;
import com.google.common.collect.ImmutableSet;
@@ -351,7 +356,7 @@ public class ResultSetTest {
.toString(),
anyOf(is(format("Error while executing SQL \"%s\": Query canceled", query)),
allOf(containsString(format("Error while executing SQL \"%s\"", query)),
- containsString("CANCELLED"))));
+ anyOf(containsString("CANCELLED"), containsString("Cancelling")))));
}
}
@@ -455,6 +460,90 @@ public class ResultSetTest {
}
}
+ @Test
+ public void testPartitionedFlightServerIgnoreFailure() throws Exception {
+ final Schema schema = new Schema(
+ Collections.singletonList(Field.nullablePrimitive("int_column", new ArrowType.Int(32, true))));
+ try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+ final FlightEndpoint firstEndpoint =
+ new FlightEndpoint(new Ticket("first".getBytes(StandardCharsets.UTF_8)),
+ Location.forGrpcInsecure("127.0.0.2", 1234),
+ Location.forGrpcInsecure("127.0.0.3", 1234));
+
+ try (final PartitionedFlightSqlProducer rootProducer = new PartitionedFlightSqlProducer(
+ schema, firstEndpoint);
+ FlightServer rootServer = FlightServer.builder(
+ allocator, forGrpcInsecure("localhost", 0), rootProducer)
+ .build()
+ .start();
+ Connection newConnection = DriverManager.getConnection(String.format(
+ "jdbc:arrow-flight-sql://%s:%d/?useEncryption=false",
+ rootServer.getLocation().getUri().getHost(), rootServer.getPort()));
+ Statement newStatement = newConnection.createStatement()) {
+ final SQLException e = Assertions.assertThrows(SQLException.class, () -> {
+ ResultSet result = newStatement.executeQuery("Select partitioned_data");
+ while (result.next()) {
+ }
+ });
+ final Throwable cause = e.getCause();
+ Assertions.assertTrue(cause instanceof FlightRuntimeException);
+ final FlightRuntimeException fre = (FlightRuntimeException) cause;
+ Assertions.assertEquals(FlightStatusCode.UNAVAILABLE, fre.status().code());
+ }
+ }
+ }
+
+ @Test
+ public void testPartitionedFlightServerAllFailure() throws Exception {
+ // Arrange
+ final Schema schema = new Schema(
+ Collections.singletonList(Field.nullablePrimitive("int_column", new ArrowType.Int(32, true))));
+ try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+ VectorSchemaRoot firstPartition = VectorSchemaRoot.create(schema, allocator)) {
+ firstPartition.setRowCount(1);
+ ((IntVector) firstPartition.getVector(0)).set(0, 1);
+
+ // Construct the data-only nodes first.
+ FlightProducer firstProducer = new PartitionedFlightSqlProducer.DataOnlyFlightSqlProducer(
+ new Ticket("first".getBytes(StandardCharsets.UTF_8)), firstPartition);
+
+ final FlightServer.Builder firstBuilder = FlightServer.builder(
+ allocator, forGrpcInsecure("localhost", 0), firstProducer);
+
+ // Run the data-only nodes so that we can get the Locations they are running at.
+ try (FlightServer firstServer = firstBuilder.build()) {
+ firstServer.start();
+ final Location badLocation = Location.forGrpcInsecure("127.0.0.2", 1234);
+ final FlightEndpoint firstEndpoint =
+ new FlightEndpoint(new Ticket("first".getBytes(StandardCharsets.UTF_8)),
+ badLocation, firstServer.getLocation());
+
+ // Finally start the root node.
+ try (final PartitionedFlightSqlProducer rootProducer = new PartitionedFlightSqlProducer(
+ schema, firstEndpoint);
+ FlightServer rootServer = FlightServer.builder(
+ allocator, forGrpcInsecure("localhost", 0), rootProducer)
+ .build()
+ .start();
+ Connection newConnection = DriverManager.getConnection(String.format(
+ "jdbc:arrow-flight-sql://%s:%d/?useEncryption=false",
+ rootServer.getLocation().getUri().getHost(), rootServer.getPort()));
+ Statement newStatement = newConnection.createStatement();
+ // Act
+ ResultSet result = newStatement.executeQuery("Select partitioned_data")) {
+ List<Integer> resultData = new ArrayList<>();
+ while (result.next()) {
+ resultData.add(result.getInt(1));
+ }
+
+ // Assert
+ assertEquals(firstPartition.getRowCount(), resultData.size());
+ assertTrue(resultData.contains(((IntVector) firstPartition.getVector(0)).get(0)));
+ }
+ }
+ }
+ }
+
@Test
public void testShouldRunSelectQueryWithEmptyVectorsEmbedded() throws Exception {
try (Statement statement = connection.createStatement();