You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/07/06 22:11:29 UTC

[GitHub] [arrow] jduo commented on a change in pull request #9368: [WIP] [POC] Flight SQL

jduo commented on a change in pull request #9368:
URL: https://github.com/apache/arrow/pull/9368#discussion_r664909672



##########
File path: java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClientUtils.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight.sql;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.arrow.flight.Action;
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightDescriptor;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.Result;
+import org.apache.arrow.flight.sql.impl.FlightSql;
+import org.apache.arrow.flight.sql.impl.FlightSql.ActionCreatePreparedStatementResult;
+import org.apache.arrow.flight.sql.impl.FlightSql.CommandPreparedStatementQuery;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+
+import io.grpc.Status;
+
+/**
+ * Client side utilities to work with Flight SQL semantics.
+ */
+public final class FlightSqlClientUtils {
+
+  /**
+   * Helper method to request a list of tables from a Flight SQL enabled endpoint.
+   *
+   * @param client              The Flight Client.
+   * @param catalog             The catalog.
+   * @param schemaFilterPattern The schema filter pattern.
+   * @param tableFilterPattern  The table filter pattern.
+   * @param tableTypes          The table types to include.
+   * @param includeSchema       True to include the schema upon return, false to not include the schema.
+   * @return a FlightInfo object representing the stream(s) to fetch.
+   */
+  public static FlightInfo getTables(FlightClient client, String catalog, String schemaFilterPattern,
+          String tableFilterPattern, List<String> tableTypes, boolean includeSchema) {
+
+    final FlightSql.CommandGetTables.Builder builder = FlightSql.CommandGetTables.newBuilder();
+
+    if (catalog != null) {
+      builder.setCatalog(catalog);
+    }
+
+    if (schemaFilterPattern != null) {
+      builder.setSchemaFilterPattern(schemaFilterPattern);
+    }
+
+    if (tableFilterPattern != null) {
+      builder.setTableNameFilterPattern(tableFilterPattern);
+    }
+
+    if (tableTypes != null) {
+      builder.addAllTableTypes(tableTypes);
+    }
+    builder.setIncludeSchema(includeSchema);
+
+    final FlightDescriptor descriptor = FlightDescriptor.command(Any.pack(builder.build()).toByteArray());
+    return client.getInfo(descriptor);
+  }
+
+  /**
+   * Helper method to create a prepared statement on the server.
+   *
+   * @param client The Flight Client.
+   * @param query  The query to prepare.
+   * @return Metadata and handles to the prepared statement which exists on the server.
+   */
+  public static FlightSqlPreparedStatement getPreparedStatement(FlightClient client, String query) {
+    return new FlightSqlPreparedStatement(client, query);
+  }
+
+  /**
+   * Helper class to encapsulate Flight SQL prepared statement logic.
+   */
+  public static class FlightSqlPreparedStatement implements Closeable {
+    private final FlightClient client;
+    private final ActionCreatePreparedStatementResult preparedStatementResult;
+    private long invocationCount;
+    private boolean isClosed;
+    private Schema resultSetSchema = null;
+    private Schema parameterSchema = null;
+
+    /**
+     * Constructor.
+     *
+     * @param client The client. FlightSqlPreparedStatement does not maintain this resource.
+     * @param sql    The query.
+     */
+    public FlightSqlPreparedStatement(FlightClient client, String sql) {
+      this.client = client;
+
+      final Iterator<Result> preparedStatementResults = client.doAction(new Action("GetPreparedStatement",
+              Any.pack(FlightSql.ActionCreatePreparedStatementRequest
+                      .newBuilder()
+                      .setQuery(sql)
+                      .build())
+                      .toByteArray()));
+
+      preparedStatementResult = FlightSqlUtils.unpackAndParseOrThrow(
+              preparedStatementResults.next().getBody(),
+              ActionCreatePreparedStatementResult.class);
+
+      invocationCount = 0;
+      isClosed = false;
+    }
+
+    /**
+     * Returns the Schema of the resultset.
+     *
+     * @return the Schema of the resultset.
+     */
+    public Schema getResultSetSchema() {
+      if (resultSetSchema == null && preparedStatementResult.getDatasetSchema() != null) {
+        resultSetSchema = Schema.deserialize(preparedStatementResult.getDatasetSchema().asReadOnlyByteBuffer());
+      }
+      return resultSetSchema;
+    }
+
+    /**
+     * Returns the Schema of the parameters.
+     *
+     * @return the Schema of the parameters.
+     */
+    public Schema getParameterSchema() {
+      if (parameterSchema == null && preparedStatementResult.getParameterSchema() != null) {
+        parameterSchema = Schema.deserialize(preparedStatementResult.getParameterSchema().asReadOnlyByteBuffer());
+      }
+      return parameterSchema;
+    }
+
+    /**
+     * Executes the prepared statement query on the server.
+     *
+     * @return a FlightInfo object representing the stream(s) to fetch.
+     * @throws IOException if the PreparedStatement is closed.
+     */
+    public FlightInfo executeQuery() throws IOException {
+      if (isClosed) {
+        throw new IOException("Prepared statement has already been closed on the server.");

Review comment:
       Nit: This isn't an IO error and is more of an IllegalStateException.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org