You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2023/01/26 18:47:00 UTC
[arrow-adbc] branch main updated: feat(go/adbc/driver/flightsql): implement GetObjects (#383)
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 518a45d feat(go/adbc/driver/flightsql): implement GetObjects (#383)
518a45d is described below
commit 518a45d47a406aba6426e14c4e92b3e3ab3c4183
Author: David Li <li...@gmail.com>
AuthorDate: Thu Jan 26 13:46:56 2023 -0500
feat(go/adbc/driver/flightsql): implement GetObjects (#383)
---
c/driver/flightsql/sqlite_flightsql_test.cc | 45 ++-
c/validation/adbc_validation.cc | 77 +++--
c/validation/adbc_validation.h | 13 +
go/adbc/driver/flightsql/flightsql_adbc.go | 420 +++++++++++++++++++++++++++-
4 files changed, 521 insertions(+), 34 deletions(-)
diff --git a/c/driver/flightsql/sqlite_flightsql_test.cc b/c/driver/flightsql/sqlite_flightsql_test.cc
index d6a18e0..71d4e99 100644
--- a/c/driver/flightsql/sqlite_flightsql_test.cc
+++ b/c/driver/flightsql/sqlite_flightsql_test.cc
@@ -26,6 +26,13 @@
using adbc_validation::IsOkStatus;
+#define CHECK_OK(EXPR) \
+ do { \
+ if (auto adbc_status = (EXPR); adbc_status != ADBC_STATUS_OK) { \
+ return adbc_status; \
+ } \
+ } while (false)
+
class SqliteFlightSqlQuirks : public adbc_validation::DriverQuirks {
public:
AdbcStatusCode SetupDatabase(struct AdbcDatabase* database,
@@ -35,11 +42,47 @@ class SqliteFlightSqlQuirks : public adbc_validation::DriverQuirks {
return ADBC_STATUS_OK;
}
+ AdbcStatusCode DropTable(struct AdbcConnection* connection, const std::string& name,
+ struct AdbcError* error) const override {
+ adbc_validation::Handle<struct AdbcStatement> statement;
+ CHECK_OK(AdbcStatementNew(connection, &statement.value, error));
+
+ std::string drop = "DROP TABLE IF EXISTS ";
+ drop += name;
+ CHECK_OK(AdbcStatementSetSqlQuery(&statement.value, drop.c_str(), error));
+ CHECK_OK(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, error));
+
+ CHECK_OK(AdbcStatementRelease(&statement.value, error));
+ return ADBC_STATUS_OK;
+ }
+
+ AdbcStatusCode CreateSampleTable(struct AdbcConnection* connection,
+ const std::string& name,
+ struct AdbcError* error) const override {
+ adbc_validation::Handle<struct AdbcStatement> statement;
+ CHECK_OK(AdbcStatementNew(connection, &statement.value, error));
+
+ std::string create = "CREATE TABLE ";
+ create += name;
+ create += " (int64s INT, strings TEXT)";
+ CHECK_OK(AdbcStatementSetSqlQuery(&statement.value, create.c_str(), error));
+ CHECK_OK(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, error));
+
+ std::string insert = "INSERT INTO ";
+ insert += name;
+ insert += " VALUES (42, 'foo'), (-42, NULL), (NULL, '')";
+ CHECK_OK(AdbcStatementSetSqlQuery(&statement.value, insert.c_str(), error));
+ CHECK_OK(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, error));
+
+ CHECK_OK(AdbcStatementRelease(&statement.value, error));
+ return ADBC_STATUS_OK;
+ }
+
std::string BindParameter(int index) const override { return "?"; }
bool supports_concurrent_statements() const override { return true; }
bool supports_transactions() const override { return false; }
bool supports_get_sql_info() const override { return true; }
- bool supports_get_objects() const override { return false; }
+ bool supports_get_objects() const override { return true; }
bool supports_bulk_ingest() const override { return false; }
bool supports_partitioned_data() const override { return true; }
bool supports_dynamic_parameter_binding() const override { return true; }
diff --git a/c/validation/adbc_validation.cc b/c/validation/adbc_validation.cc
index bb83013..cd6cd1e 100644
--- a/c/validation/adbc_validation.cc
+++ b/c/validation/adbc_validation.cc
@@ -53,6 +53,50 @@ namespace {
} while (false)
} // namespace
+//------------------------------------------------------------
+// DriverQuirks
+
+AdbcStatusCode DoIngestSampleTable(struct AdbcConnection* connection,
+ const std::string& name, struct AdbcError* error) {
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ CHECK_OK(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64},
+ {"strings", NANOARROW_TYPE_STRING}}));
+ CHECK_OK((MakeBatch<int64_t, std::string>(&schema.value, &array.value, &na_error,
+ {42, -42, std::nullopt},
+ {"foo", std::nullopt, ""})));
+
+ Handle<struct AdbcStatement> statement;
+ CHECK_OK(AdbcStatementNew(connection, &statement.value, error));
+ CHECK_OK(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
+ name.c_str(), error));
+ CHECK_OK(AdbcStatementBind(&statement.value, &array.value, &schema.value, error));
+ CHECK_OK(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, error));
+ CHECK_OK(AdbcStatementRelease(&statement.value, error));
+ return ADBC_STATUS_OK;
+}
+
+void IngestSampleTable(struct AdbcConnection* connection, struct AdbcError* error) {
+ ASSERT_THAT(DoIngestSampleTable(connection, "bulk_ingest", error), IsOkStatus(error));
+}
+
+AdbcStatusCode DriverQuirks::EnsureSampleTable(struct AdbcConnection* connection,
+ const std::string& name,
+ struct AdbcError* error) const {
+ CHECK_OK(DropTable(connection, name, error));
+ return CreateSampleTable(connection, name, error);
+}
+
+AdbcStatusCode DriverQuirks::CreateSampleTable(struct AdbcConnection* connection,
+ const std::string& name,
+ struct AdbcError* error) const {
+ if (!supports_bulk_ingest()) {
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+ }
+ return DoIngestSampleTable(connection, name, error);
+}
+
//------------------------------------------------------------
// Tests of AdbcDatabase
@@ -193,30 +237,6 @@ void ConnectionTest::TestAutocommitToggle() {
//------------------------------------------------------------
// Tests of metadata
-void IngestSampleTable(struct AdbcConnection* connection, struct AdbcError* error) {
- Handle<struct ArrowSchema> schema;
- Handle<struct ArrowArray> array;
- struct ArrowError na_error;
- ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64},
- {"strings", NANOARROW_TYPE_STRING}}),
- IsOkErrno());
- ASSERT_THAT((MakeBatch<int64_t, std::string>(&schema.value, &array.value, &na_error,
- {42, -42, std::nullopt},
- {"foo", std::nullopt, ""})),
- IsOkErrno());
-
- Handle<struct AdbcStatement> statement;
- ASSERT_THAT(AdbcStatementNew(connection, &statement.value, error), IsOkStatus(error));
- ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
- "bulk_ingest", error),
- IsOkStatus(error));
- ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, error),
- IsOkStatus(error));
- ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, error),
- IsOkStatus(error));
- ASSERT_THAT(AdbcStatementRelease(&statement.value, error), IsOkStatus(error));
-}
-
void ConnectionTest::TestMetadataGetInfo() {
ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error));
@@ -513,9 +533,8 @@ void ConnectionTest::TestMetadataGetObjectsTables() {
GTEST_SKIP();
}
- ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
+ ASSERT_THAT(quirks()->EnsureSampleTable(&connection, "bulk_ingest", &error),
IsOkStatus(&error));
- ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection, &error));
std::vector<std::pair<const char*, bool>> test_cases = {
{nullptr, true}, {"bulk_%", true}, {"asdf%", false}};
@@ -593,9 +612,8 @@ void ConnectionTest::TestMetadataGetObjectsTablesTypes() {
GTEST_SKIP();
}
- ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
+ ASSERT_THAT(quirks()->EnsureSampleTable(&connection, "bulk_ingest", &error),
IsOkStatus(&error));
- ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection, &error));
std::vector<const char*> table_types(2);
table_types[0] = "this_table_type_does_not_exist";
@@ -668,9 +686,8 @@ void ConnectionTest::TestMetadataGetObjectsColumns() {
ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error));
- ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
+ ASSERT_THAT(quirks()->EnsureSampleTable(&connection, "bulk_ingest", &error),
IsOkStatus(&error));
- ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection, &error));
struct TestCase {
std::optional<std::string> filter;
diff --git a/c/validation/adbc_validation.h b/c/validation/adbc_validation.h
index c57602b..5cba030 100644
--- a/c/validation/adbc_validation.h
+++ b/c/validation/adbc_validation.h
@@ -44,6 +44,19 @@ class DriverQuirks {
return ADBC_STATUS_OK;
}
+ virtual AdbcStatusCode EnsureSampleTable(struct AdbcConnection* connection,
+ const std::string& name,
+ struct AdbcError* error) const;
+
+ /// \brief Create a table of sample data with a fixed schema for testing.
+ ///
+ /// The table should have two columns:
+ /// - "int64s" with Arrow type int64.
+ /// - "strings" with Arrow type utf8.
+ virtual AdbcStatusCode CreateSampleTable(struct AdbcConnection* connection,
+ const std::string& name,
+ struct AdbcError* error) const;
+
/// \brief Return the SQL to reference the bind parameter of the given index
virtual std::string BindParameter(int index) const { return "?"; }
diff --git a/go/adbc/driver/flightsql/flightsql_adbc.go b/go/adbc/driver/flightsql/flightsql_adbc.go
index d806740..3d483eb 100644
--- a/go/adbc/driver/flightsql/flightsql_adbc.go
+++ b/go/adbc/driver/flightsql/flightsql_adbc.go
@@ -32,12 +32,14 @@
package flightsql
import (
+ "bytes"
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net/url"
+ "regexp"
"runtime/debug"
"strings"
"time"
@@ -47,6 +49,8 @@ import (
"github.com/apache/arrow/go/v11/arrow/array"
"github.com/apache/arrow/go/v11/arrow/flight"
"github.com/apache/arrow/go/v11/arrow/flight/flightsql"
+ "github.com/apache/arrow/go/v11/arrow/flight/flightsql/schema_ref"
+ "github.com/apache/arrow/go/v11/arrow/ipc"
"github.com/apache/arrow/go/v11/arrow/memory"
"github.com/bluele/gcache"
"golang.org/x/exp/maps"
@@ -564,10 +568,420 @@ func (c *cnxn) GetInfo(ctx context.Context, infoCodes []adbc.InfoCode) (array.Re
// All non-empty, non-nil strings should be a search pattern (as described
// earlier).
func (c *cnxn) GetObjects(ctx context.Context, depth adbc.ObjectDepth, catalog *string, dbSchema *string, tableName *string, columnName *string, tableType []string) (array.RecordReader, error) {
- return nil, adbc.Error{
- Msg: "Not implemented: GetObjects",
- Code: adbc.StatusNotImplemented,
+ ctx = metadata.NewOutgoingContext(ctx, c.hdrs)
+ g := getObjects{ctx: ctx, depth: depth, catalog: catalog, dbSchema: dbSchema, tableName: tableName, columnName: columnName, tableType: tableType}
+ if err := g.init(c); err != nil {
+ return nil, err
+ }
+ defer g.release()
+
+ // To avoid an N+1 query problem, we assume result sets here will fit in memory and build up a single response.
+ info, err := c.cl.GetCatalogs(ctx)
+ if err != nil {
+ return nil, adbcFromFlightStatus(err)
+ }
+
+ rdr, err := c.readInfo(ctx, schema_ref.Catalogs, info)
+ if err != nil {
+ return nil, adbcFromFlightStatus(err)
+ }
+ defer rdr.Release()
+
+ for rdr.Next() {
+ arr := rdr.Record().Column(0).(*array.String)
+ for i := 0; i < arr.Len(); i++ {
+ catalogName := arr.Value(i)
+ g.appendCatalog(catalogName)
+ }
+ }
+
+ if err = rdr.Err(); err != nil {
+ return nil, adbcFromFlightStatus(err)
+ }
+
+ return g.finish()
+}
+
+// Helper to store state needed for GetObjects
+type getObjects struct {
+ ctx context.Context
+ depth adbc.ObjectDepth
+ catalog *string
+ dbSchema *string
+ tableName *string
+ columnName *string
+ tableType []string
+
+ builder *array.RecordBuilder
+ schemaLookup map[string][]string
+ tableLookup map[catalogAndSchema][]tableInfo
+ catalogPattern *regexp.Regexp
+ columnNamePattern *regexp.Regexp
+
+ catalogNameBuilder *array.StringBuilder
+ catalogDbSchemasBuilder *array.ListBuilder
+ catalogDbSchemasItems *array.StructBuilder
+ dbSchemaNameBuilder *array.StringBuilder
+ dbSchemaTablesBuilder *array.ListBuilder
+ dbSchemaTablesItems *array.StructBuilder
+ tableNameBuilder *array.StringBuilder
+ tableTypeBuilder *array.StringBuilder
+ tableColumnsBuilder *array.ListBuilder
+ tableColumnsItems *array.StructBuilder
+ columnNameBuilder *array.StringBuilder
+ ordinalPositionBuilder *array.Int32Builder
+ remarksBuilder *array.StringBuilder
+ xdbcDataTypeBuilder *array.Int16Builder
+ xdbcTypeNameBuilder *array.StringBuilder
+ xdbcColumnSizeBuilder *array.Int32Builder
+ xdbcDecimalDigitsBuilder *array.Int16Builder
+ xdbcNumPrecRadixBuilder *array.Int16Builder
+ xdbcNullableBuilder *array.Int16Builder
+ xdbcColumnDefBuilder *array.StringBuilder
+ xdbcSqlDataTypeBuilder *array.Int16Builder
+ xdbcDatetimeSubBuilder *array.Int16Builder
+ xdbcCharOctetLengthBuilder *array.Int32Builder
+ xdbcIsNullableBuilder *array.StringBuilder
+ xdbcScopeCatalogBuilder *array.StringBuilder
+ xdbcScopeSchemaBuilder *array.StringBuilder
+ xdbcScopeTableBuilder *array.StringBuilder
+ xdbcIsAutoincrementBuilder *array.BooleanBuilder
+ xdbcIsGeneratedcolumnBuilder *array.BooleanBuilder
+ tableConstraintsBuilder *array.ListBuilder
+}
+
+func (g *getObjects) init(c *cnxn) error {
+ if catalogToDbSchemas, err := c.getObjectsDbSchemas(g.ctx, g.depth, g.catalog, g.dbSchema); err != nil {
+ return err
+ } else {
+ g.schemaLookup = catalogToDbSchemas
+ }
+
+ if tableLookup, err := c.getObjectsTables(g.ctx, g.depth, g.catalog, g.dbSchema, g.tableName, g.columnName, g.tableType); err != nil {
+ return err
+ } else {
+ g.tableLookup = tableLookup
+ }
+
+ if catalogPattern, err := patternToRegexp(g.catalog); err != nil {
+ return adbc.Error{
+ Msg: err.Error(),
+ Code: adbc.StatusInvalidArgument,
+ }
+ } else {
+ g.catalogPattern = catalogPattern
+ }
+ if columnNamePattern, err := patternToRegexp(g.columnName); err != nil {
+ return adbc.Error{
+ Msg: err.Error(),
+ Code: adbc.StatusInvalidArgument,
+ }
+ } else {
+ g.columnNamePattern = columnNamePattern
+ }
+
+ g.builder = array.NewRecordBuilder(c.db.alloc, adbc.GetObjectsSchema)
+ g.catalogNameBuilder = g.builder.Field(0).(*array.StringBuilder)
+ g.catalogDbSchemasBuilder = g.builder.Field(1).(*array.ListBuilder)
+ g.catalogDbSchemasItems = g.catalogDbSchemasBuilder.ValueBuilder().(*array.StructBuilder)
+ g.dbSchemaNameBuilder = g.catalogDbSchemasItems.FieldBuilder(0).(*array.StringBuilder)
+ g.dbSchemaTablesBuilder = g.catalogDbSchemasItems.FieldBuilder(1).(*array.ListBuilder)
+ g.dbSchemaTablesItems = g.dbSchemaTablesBuilder.ValueBuilder().(*array.StructBuilder)
+ g.tableNameBuilder = g.dbSchemaTablesItems.FieldBuilder(0).(*array.StringBuilder)
+ g.tableTypeBuilder = g.dbSchemaTablesItems.FieldBuilder(1).(*array.StringBuilder)
+ g.tableColumnsBuilder = g.dbSchemaTablesItems.FieldBuilder(2).(*array.ListBuilder)
+ g.tableColumnsItems = g.tableColumnsBuilder.ValueBuilder().(*array.StructBuilder)
+ g.columnNameBuilder = g.tableColumnsItems.FieldBuilder(0).(*array.StringBuilder)
+ g.ordinalPositionBuilder = g.tableColumnsItems.FieldBuilder(1).(*array.Int32Builder)
+ g.remarksBuilder = g.tableColumnsItems.FieldBuilder(2).(*array.StringBuilder)
+ g.xdbcDataTypeBuilder = g.tableColumnsItems.FieldBuilder(3).(*array.Int16Builder)
+ g.xdbcTypeNameBuilder = g.tableColumnsItems.FieldBuilder(4).(*array.StringBuilder)
+ g.xdbcColumnSizeBuilder = g.tableColumnsItems.FieldBuilder(5).(*array.Int32Builder)
+ g.xdbcDecimalDigitsBuilder = g.tableColumnsItems.FieldBuilder(6).(*array.Int16Builder)
+ g.xdbcNumPrecRadixBuilder = g.tableColumnsItems.FieldBuilder(7).(*array.Int16Builder)
+ g.xdbcNullableBuilder = g.tableColumnsItems.FieldBuilder(8).(*array.Int16Builder)
+ g.xdbcColumnDefBuilder = g.tableColumnsItems.FieldBuilder(9).(*array.StringBuilder)
+ g.xdbcSqlDataTypeBuilder = g.tableColumnsItems.FieldBuilder(10).(*array.Int16Builder)
+ g.xdbcDatetimeSubBuilder = g.tableColumnsItems.FieldBuilder(11).(*array.Int16Builder)
+ g.xdbcCharOctetLengthBuilder = g.tableColumnsItems.FieldBuilder(12).(*array.Int32Builder)
+ g.xdbcIsNullableBuilder = g.tableColumnsItems.FieldBuilder(13).(*array.StringBuilder)
+ g.xdbcScopeCatalogBuilder = g.tableColumnsItems.FieldBuilder(14).(*array.StringBuilder)
+ g.xdbcScopeSchemaBuilder = g.tableColumnsItems.FieldBuilder(15).(*array.StringBuilder)
+ g.xdbcScopeTableBuilder = g.tableColumnsItems.FieldBuilder(16).(*array.StringBuilder)
+ g.xdbcIsAutoincrementBuilder = g.tableColumnsItems.FieldBuilder(17).(*array.BooleanBuilder)
+ g.xdbcIsGeneratedcolumnBuilder = g.tableColumnsItems.FieldBuilder(18).(*array.BooleanBuilder)
+ g.tableConstraintsBuilder = g.dbSchemaTablesItems.FieldBuilder(3).(*array.ListBuilder)
+
+ return nil
+}
+
+func (g *getObjects) release() {
+ g.builder.Release()
+}
+
+func (g *getObjects) finish() (array.RecordReader, error) {
+ record := g.builder.NewRecord()
+ result, err := array.NewRecordReader(g.builder.Schema(), []arrow.Record{record})
+ if err != nil {
+ return nil, adbc.Error{
+ Msg: err.Error(),
+ Code: adbc.StatusInternal,
+ }
+ }
+ return result, nil
+}
+
+func (g *getObjects) appendCatalog(catalogName string) {
+ if g.catalogPattern != nil && !g.catalogPattern.MatchString(catalogName) {
+ return
+ }
+ g.catalogNameBuilder.Append(catalogName)
+
+ if g.depth == adbc.ObjectDepthCatalogs {
+ g.catalogDbSchemasBuilder.AppendNull()
+ return
+ }
+ g.catalogDbSchemasBuilder.Append(true)
+
+ for _, dbSchemaName := range g.schemaLookup[catalogName] {
+ g.appendDbSchema(catalogName, dbSchemaName)
+ }
+}
+
+func (g *getObjects) appendDbSchema(catalogName, dbSchemaName string) {
+ g.dbSchemaNameBuilder.Append(dbSchemaName)
+ g.catalogDbSchemasItems.Append(true)
+
+ if g.depth == adbc.ObjectDepthDBSchemas {
+ g.dbSchemaTablesBuilder.AppendNull()
+ return
+ }
+ g.dbSchemaTablesBuilder.Append(true)
+
+ for _, tableInfo := range g.tableLookup[catalogAndSchema{
+ catalog: catalogName,
+ schema: dbSchemaName,
+ }] {
+ g.appendTableInfo(tableInfo)
+ }
+}
+
+func (g *getObjects) appendTableInfo(tableInfo tableInfo) {
+ g.tableNameBuilder.Append(tableInfo.name)
+ g.tableTypeBuilder.Append(tableInfo.tableType)
+ g.dbSchemaTablesItems.Append(true)
+
+ if g.depth == adbc.ObjectDepthTables {
+ g.tableColumnsBuilder.AppendNull()
+ g.tableConstraintsBuilder.AppendNull()
+ return
+ }
+ g.tableColumnsBuilder.Append(true)
+ // TODO: unimplemented for now
+ g.tableConstraintsBuilder.Append(true)
+
+ for colIndex, column := range tableInfo.schema.Fields() {
+ if g.columnNamePattern != nil && !g.columnNamePattern.MatchString(column.Name) {
+ continue
+ }
+ g.columnNameBuilder.Append(column.Name)
+ g.ordinalPositionBuilder.Append(int32(colIndex + 1))
+ g.remarksBuilder.AppendNull()
+ g.xdbcDataTypeBuilder.AppendNull()
+ g.xdbcTypeNameBuilder.AppendNull()
+ g.xdbcColumnSizeBuilder.AppendNull()
+ g.xdbcDecimalDigitsBuilder.AppendNull()
+ g.xdbcNumPrecRadixBuilder.AppendNull()
+ g.xdbcNullableBuilder.AppendNull()
+ g.xdbcColumnDefBuilder.AppendNull()
+ g.xdbcSqlDataTypeBuilder.AppendNull()
+ g.xdbcDatetimeSubBuilder.AppendNull()
+ g.xdbcCharOctetLengthBuilder.AppendNull()
+ g.xdbcIsNullableBuilder.AppendNull()
+ g.xdbcScopeCatalogBuilder.AppendNull()
+ g.xdbcScopeSchemaBuilder.AppendNull()
+ g.xdbcScopeTableBuilder.AppendNull()
+ g.xdbcIsAutoincrementBuilder.AppendNull()
+ g.xdbcIsGeneratedcolumnBuilder.AppendNull()
+
+ g.tableColumnsItems.Append(true)
+ }
+}
+
+// Helper function to read and validate a metadata stream
+func (c *cnxn) readInfo(ctx context.Context, expectedSchema *arrow.Schema, info *flight.FlightInfo) (array.RecordReader, error) {
+ // use a default queueSize for the reader
+ rdr, err := newRecordReader(ctx, c.db.alloc, c.cl, info, c.clientCache, 5)
+ if err != nil {
+ return nil, adbcFromFlightStatus(err)
+ }
+
+ if !rdr.Schema().Equal(expectedSchema) {
+ rdr.Release()
+ return nil, adbc.Error{
+ Msg: fmt.Sprintf("Invalid schema returned for: expected %s, got %s", expectedSchema.String(), rdr.Schema().String()),
+ Code: adbc.StatusInternal,
+ }
+ }
+ return rdr, nil
+}
+
+// Helper function that compiles a SQL-style pattern (%, _) to a regex
+func patternToRegexp(pattern *string) (*regexp.Regexp, error) {
+ if pattern == nil {
+ return nil, nil
+ }
+
+ var builder strings.Builder
+ if _, err := builder.WriteString("^"); err != nil {
+ return nil, err
+ }
+ for _, c := range *pattern {
+ switch {
+ case c == rune('_'):
+ if _, err := builder.WriteString("."); err != nil {
+ return nil, err
+ }
+ case c == rune('%'):
+ if _, err := builder.WriteString(".*"); err != nil {
+ return nil, err
+ }
+ default:
+ if _, err := builder.WriteString(regexp.QuoteMeta(string([]rune{c}))); err != nil {
+ return nil, err
+ }
+ }
+ }
+ if _, err := builder.WriteString("$"); err != nil {
+ return nil, err
+ }
+ return regexp.Compile(builder.String())
+}
+
+// Helper function to build up a map of catalogs to DB schemas
+func (c *cnxn) getObjectsDbSchemas(ctx context.Context, depth adbc.ObjectDepth, catalog *string, dbSchema *string) (result map[string][]string, err error) {
+ if depth == adbc.ObjectDepthCatalogs {
+ return
+ }
+ result = make(map[string][]string)
+ // Pre-populate the map of which schemas are in which catalogs
+ info, err := c.cl.GetDBSchemas(ctx, &flightsql.GetDBSchemasOpts{DbSchemaFilterPattern: dbSchema})
+ if err != nil {
+ return nil, adbcFromFlightStatus(err)
+ }
+
+ rdr, err := c.readInfo(ctx, schema_ref.DBSchemas, info)
+ if err != nil {
+ return nil, adbcFromFlightStatus(err)
+ }
+ defer rdr.Release()
+
+ for rdr.Next() {
+ // Nullable
+ catalog := rdr.Record().Column(0).(*array.String)
+ // Non-nullable
+ dbSchema := rdr.Record().Column(1).(*array.String)
+
+ for i := 0; i < catalog.Len(); i++ {
+ catalogName := ""
+ if !catalog.IsNull(i) {
+ catalogName = catalog.Value(i)
+ }
+ result[catalogName] = append(result[catalogName], dbSchema.Value(i))
+ }
+ }
+
+ if rdr.Err() != nil {
+ result = nil
+ err = adbcFromFlightStatus(rdr.Err())
+ }
+ return
+}
+
+type catalogAndSchema struct {
+ catalog, schema string
+}
+
+type tableInfo struct {
+ name, tableType string
+ schema *arrow.Schema
+}
+
+func (c *cnxn) getObjectsTables(ctx context.Context, depth adbc.ObjectDepth, catalog *string, dbSchema *string, tableName *string, columnName *string, tableType []string) (result map[catalogAndSchema][]tableInfo, err error) {
+ if depth == adbc.ObjectDepthCatalogs || depth == adbc.ObjectDepthDBSchemas {
+ return
+ }
+ result = make(map[catalogAndSchema][]tableInfo)
+
+ // Pre-populate the map of which schemas are in which catalogs
+ includeSchema := depth == adbc.ObjectDepthAll || depth == adbc.ObjectDepthColumns
+ info, err := c.cl.GetTables(ctx, &flightsql.GetTablesOpts{
+ TableNameFilterPattern: tableName,
+ TableTypes: tableType,
+ IncludeSchema: includeSchema,
+ })
+ if err != nil {
+ return nil, adbcFromFlightStatus(err)
+ }
+
+ expectedSchema := schema_ref.Tables
+ if includeSchema {
+ expectedSchema = schema_ref.TablesWithIncludedSchema
+ }
+ rdr, err := c.readInfo(ctx, expectedSchema, info)
+ if err != nil {
+ return nil, adbcFromFlightStatus(err)
+ }
+ defer rdr.Release()
+
+ for rdr.Next() {
+ // Nullable
+ catalog := rdr.Record().Column(0).(*array.String)
+ dbSchema := rdr.Record().Column(1).(*array.String)
+ // Non-nullable
+ tableName := rdr.Record().Column(2).(*array.String)
+ tableType := rdr.Record().Column(3).(*array.String)
+
+ for i := 0; i < catalog.Len(); i++ {
+ catalogName := ""
+ dbSchemaName := ""
+ if !catalog.IsNull(i) {
+ catalogName = catalog.Value(i)
+ }
+ if !dbSchema.IsNull(i) {
+ dbSchemaName = dbSchema.Value(i)
+ }
+ key := catalogAndSchema{
+ catalog: catalogName,
+ schema: dbSchemaName,
+ }
+
+ var schema *arrow.Schema
+ if includeSchema {
+ reader, err := ipc.NewReader(bytes.NewReader(rdr.Record().Column(4).(*array.Binary).Value(i)))
+ if err != nil {
+ return nil, adbc.Error{
+ Msg: err.Error(),
+ Code: adbc.StatusInternal,
+ }
+ }
+ schema = reader.Schema()
+ reader.Release()
+ }
+
+ result[key] = append(result[key], tableInfo{
+ name: tableName.Value(i),
+ tableType: tableType.Value(i),
+ schema: schema,
+ })
+ }
+ }
+
+ if rdr.Err() != nil {
+ result = nil
+ err = adbcFromFlightStatus(rdr.Err())
}
+ return
}
func (c *cnxn) GetTableSchema(ctx context.Context, catalog *string, dbSchema *string, tableName string) (*arrow.Schema, error) {