You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2023/01/26 18:47:00 UTC

[arrow-adbc] branch main updated: feat(go/adbc/driver/flightsql): implement GetObjects (#383)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 518a45d  feat(go/adbc/driver/flightsql): implement GetObjects (#383)
518a45d is described below

commit 518a45d47a406aba6426e14c4e92b3e3ab3c4183
Author: David Li <li...@gmail.com>
AuthorDate: Thu Jan 26 13:46:56 2023 -0500

    feat(go/adbc/driver/flightsql): implement GetObjects (#383)
---
 c/driver/flightsql/sqlite_flightsql_test.cc |  45 ++-
 c/validation/adbc_validation.cc             |  77 +++--
 c/validation/adbc_validation.h              |  13 +
 go/adbc/driver/flightsql/flightsql_adbc.go  | 420 +++++++++++++++++++++++++++-
 4 files changed, 521 insertions(+), 34 deletions(-)

diff --git a/c/driver/flightsql/sqlite_flightsql_test.cc b/c/driver/flightsql/sqlite_flightsql_test.cc
index d6a18e0..71d4e99 100644
--- a/c/driver/flightsql/sqlite_flightsql_test.cc
+++ b/c/driver/flightsql/sqlite_flightsql_test.cc
@@ -26,6 +26,13 @@
 
 using adbc_validation::IsOkStatus;
 
+#define CHECK_OK(EXPR)                                              \
+  do {                                                              \
+    if (auto adbc_status = (EXPR); adbc_status != ADBC_STATUS_OK) { \
+      return adbc_status;                                           \
+    }                                                               \
+  } while (false)
+
 class SqliteFlightSqlQuirks : public adbc_validation::DriverQuirks {
  public:
   AdbcStatusCode SetupDatabase(struct AdbcDatabase* database,
@@ -35,11 +42,47 @@ class SqliteFlightSqlQuirks : public adbc_validation::DriverQuirks {
     return ADBC_STATUS_OK;
   }
 
+  AdbcStatusCode DropTable(struct AdbcConnection* connection, const std::string& name,
+                           struct AdbcError* error) const override {
+    adbc_validation::Handle<struct AdbcStatement> statement;
+    CHECK_OK(AdbcStatementNew(connection, &statement.value, error));
+
+    std::string drop = "DROP TABLE IF EXISTS ";
+    drop += name;
+    CHECK_OK(AdbcStatementSetSqlQuery(&statement.value, drop.c_str(), error));
+    CHECK_OK(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, error));
+
+    CHECK_OK(AdbcStatementRelease(&statement.value, error));
+    return ADBC_STATUS_OK;
+  }
+
+  AdbcStatusCode CreateSampleTable(struct AdbcConnection* connection,
+                                   const std::string& name,
+                                   struct AdbcError* error) const override {
+    adbc_validation::Handle<struct AdbcStatement> statement;
+    CHECK_OK(AdbcStatementNew(connection, &statement.value, error));
+
+    std::string create = "CREATE TABLE ";
+    create += name;
+    create += " (int64s INT, strings TEXT)";
+    CHECK_OK(AdbcStatementSetSqlQuery(&statement.value, create.c_str(), error));
+    CHECK_OK(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, error));
+
+    std::string insert = "INSERT INTO ";
+    insert += name;
+    insert += " VALUES (42, 'foo'), (-42, NULL), (NULL, '')";
+    CHECK_OK(AdbcStatementSetSqlQuery(&statement.value, insert.c_str(), error));
+    CHECK_OK(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, error));
+
+    CHECK_OK(AdbcStatementRelease(&statement.value, error));
+    return ADBC_STATUS_OK;
+  }
+
   std::string BindParameter(int index) const override { return "?"; }
   bool supports_concurrent_statements() const override { return true; }
   bool supports_transactions() const override { return false; }
   bool supports_get_sql_info() const override { return true; }
-  bool supports_get_objects() const override { return false; }
+  bool supports_get_objects() const override { return true; }
   bool supports_bulk_ingest() const override { return false; }
   bool supports_partitioned_data() const override { return true; }
   bool supports_dynamic_parameter_binding() const override { return true; }
diff --git a/c/validation/adbc_validation.cc b/c/validation/adbc_validation.cc
index bb83013..cd6cd1e 100644
--- a/c/validation/adbc_validation.cc
+++ b/c/validation/adbc_validation.cc
@@ -53,6 +53,50 @@ namespace {
   } while (false)
 }  // namespace
 
+//------------------------------------------------------------
+// DriverQuirks
+
+AdbcStatusCode DoIngestSampleTable(struct AdbcConnection* connection,
+                                   const std::string& name, struct AdbcError* error) {
+  Handle<struct ArrowSchema> schema;
+  Handle<struct ArrowArray> array;
+  struct ArrowError na_error;
+  CHECK_OK(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64},
+                                      {"strings", NANOARROW_TYPE_STRING}}));
+  CHECK_OK((MakeBatch<int64_t, std::string>(&schema.value, &array.value, &na_error,
+                                            {42, -42, std::nullopt},
+                                            {"foo", std::nullopt, ""})));
+
+  Handle<struct AdbcStatement> statement;
+  CHECK_OK(AdbcStatementNew(connection, &statement.value, error));
+  CHECK_OK(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
+                                  name.c_str(), error));
+  CHECK_OK(AdbcStatementBind(&statement.value, &array.value, &schema.value, error));
+  CHECK_OK(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, error));
+  CHECK_OK(AdbcStatementRelease(&statement.value, error));
+  return ADBC_STATUS_OK;
+}
+
+void IngestSampleTable(struct AdbcConnection* connection, struct AdbcError* error) {
+  ASSERT_THAT(DoIngestSampleTable(connection, "bulk_ingest", error), IsOkStatus(error));
+}
+
+AdbcStatusCode DriverQuirks::EnsureSampleTable(struct AdbcConnection* connection,
+                                               const std::string& name,
+                                               struct AdbcError* error) const {
+  CHECK_OK(DropTable(connection, name, error));
+  return CreateSampleTable(connection, name, error);
+}
+
+AdbcStatusCode DriverQuirks::CreateSampleTable(struct AdbcConnection* connection,
+                                               const std::string& name,
+                                               struct AdbcError* error) const {
+  if (!supports_bulk_ingest()) {
+    return ADBC_STATUS_NOT_IMPLEMENTED;
+  }
+  return DoIngestSampleTable(connection, name, error);
+}
+
 //------------------------------------------------------------
 // Tests of AdbcDatabase
 
@@ -193,30 +237,6 @@ void ConnectionTest::TestAutocommitToggle() {
 //------------------------------------------------------------
 // Tests of metadata
 
-void IngestSampleTable(struct AdbcConnection* connection, struct AdbcError* error) {
-  Handle<struct ArrowSchema> schema;
-  Handle<struct ArrowArray> array;
-  struct ArrowError na_error;
-  ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64},
-                                         {"strings", NANOARROW_TYPE_STRING}}),
-              IsOkErrno());
-  ASSERT_THAT((MakeBatch<int64_t, std::string>(&schema.value, &array.value, &na_error,
-                                               {42, -42, std::nullopt},
-                                               {"foo", std::nullopt, ""})),
-              IsOkErrno());
-
-  Handle<struct AdbcStatement> statement;
-  ASSERT_THAT(AdbcStatementNew(connection, &statement.value, error), IsOkStatus(error));
-  ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
-                                     "bulk_ingest", error),
-              IsOkStatus(error));
-  ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, error),
-              IsOkStatus(error));
-  ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, error),
-              IsOkStatus(error));
-  ASSERT_THAT(AdbcStatementRelease(&statement.value, error), IsOkStatus(error));
-}
-
 void ConnectionTest::TestMetadataGetInfo() {
   ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error));
   ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error));
@@ -513,9 +533,8 @@ void ConnectionTest::TestMetadataGetObjectsTables() {
     GTEST_SKIP();
   }
 
-  ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
+  ASSERT_THAT(quirks()->EnsureSampleTable(&connection, "bulk_ingest", &error),
               IsOkStatus(&error));
-  ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection, &error));
 
   std::vector<std::pair<const char*, bool>> test_cases = {
       {nullptr, true}, {"bulk_%", true}, {"asdf%", false}};
@@ -593,9 +612,8 @@ void ConnectionTest::TestMetadataGetObjectsTablesTypes() {
     GTEST_SKIP();
   }
 
-  ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
+  ASSERT_THAT(quirks()->EnsureSampleTable(&connection, "bulk_ingest", &error),
               IsOkStatus(&error));
-  ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection, &error));
 
   std::vector<const char*> table_types(2);
   table_types[0] = "this_table_type_does_not_exist";
@@ -668,9 +686,8 @@ void ConnectionTest::TestMetadataGetObjectsColumns() {
 
   ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error));
   ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error));
-  ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
+  ASSERT_THAT(quirks()->EnsureSampleTable(&connection, "bulk_ingest", &error),
               IsOkStatus(&error));
-  ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection, &error));
 
   struct TestCase {
     std::optional<std::string> filter;
diff --git a/c/validation/adbc_validation.h b/c/validation/adbc_validation.h
index c57602b..5cba030 100644
--- a/c/validation/adbc_validation.h
+++ b/c/validation/adbc_validation.h
@@ -44,6 +44,19 @@ class DriverQuirks {
     return ADBC_STATUS_OK;
   }
 
+  virtual AdbcStatusCode EnsureSampleTable(struct AdbcConnection* connection,
+                                           const std::string& name,
+                                           struct AdbcError* error) const;
+
+  /// \brief Create a table of sample data with a fixed schema for testing.
+  ///
+  /// The table should have two columns:
+  /// - "int64s" with Arrow type int64.
+  /// - "strings" with Arrow type utf8.
+  virtual AdbcStatusCode CreateSampleTable(struct AdbcConnection* connection,
+                                           const std::string& name,
+                                           struct AdbcError* error) const;
+
   /// \brief Return the SQL to reference the bind parameter of the given index
   virtual std::string BindParameter(int index) const { return "?"; }
 
diff --git a/go/adbc/driver/flightsql/flightsql_adbc.go b/go/adbc/driver/flightsql/flightsql_adbc.go
index d806740..3d483eb 100644
--- a/go/adbc/driver/flightsql/flightsql_adbc.go
+++ b/go/adbc/driver/flightsql/flightsql_adbc.go
@@ -32,12 +32,14 @@
 package flightsql
 
 import (
+	"bytes"
 	"context"
 	"crypto/tls"
 	"crypto/x509"
 	"fmt"
 	"io"
 	"net/url"
+	"regexp"
 	"runtime/debug"
 	"strings"
 	"time"
@@ -47,6 +49,8 @@ import (
 	"github.com/apache/arrow/go/v11/arrow/array"
 	"github.com/apache/arrow/go/v11/arrow/flight"
 	"github.com/apache/arrow/go/v11/arrow/flight/flightsql"
+	"github.com/apache/arrow/go/v11/arrow/flight/flightsql/schema_ref"
+	"github.com/apache/arrow/go/v11/arrow/ipc"
 	"github.com/apache/arrow/go/v11/arrow/memory"
 	"github.com/bluele/gcache"
 	"golang.org/x/exp/maps"
@@ -564,10 +568,420 @@ func (c *cnxn) GetInfo(ctx context.Context, infoCodes []adbc.InfoCode) (array.Re
 // All non-empty, non-nil strings should be a search pattern (as described
 // earlier).
 func (c *cnxn) GetObjects(ctx context.Context, depth adbc.ObjectDepth, catalog *string, dbSchema *string, tableName *string, columnName *string, tableType []string) (array.RecordReader, error) {
-	return nil, adbc.Error{
-		Msg:  "Not implemented: GetObjects",
-		Code: adbc.StatusNotImplemented,
+	ctx = metadata.NewOutgoingContext(ctx, c.hdrs)
+	g := getObjects{ctx: ctx, depth: depth, catalog: catalog, dbSchema: dbSchema, tableName: tableName, columnName: columnName, tableType: tableType}
+	if err := g.init(c); err != nil {
+		return nil, err
+	}
+	defer g.release()
+
+	// To avoid an N+1 query problem, we assume result sets here will fit in memory and build up a single response.
+	info, err := c.cl.GetCatalogs(ctx)
+	if err != nil {
+		return nil, adbcFromFlightStatus(err)
+	}
+
+	rdr, err := c.readInfo(ctx, schema_ref.Catalogs, info)
+	if err != nil {
+		return nil, adbcFromFlightStatus(err)
+	}
+	defer rdr.Release()
+
+	for rdr.Next() {
+		arr := rdr.Record().Column(0).(*array.String)
+		for i := 0; i < arr.Len(); i++ {
+			catalogName := arr.Value(i)
+			g.appendCatalog(catalogName)
+		}
+	}
+
+	if err = rdr.Err(); err != nil {
+		return nil, adbcFromFlightStatus(err)
+	}
+
+	return g.finish()
+}
+
+// Helper to store state needed for GetObjects
+type getObjects struct {
+	ctx        context.Context
+	depth      adbc.ObjectDepth
+	catalog    *string
+	dbSchema   *string
+	tableName  *string
+	columnName *string
+	tableType  []string
+
+	builder           *array.RecordBuilder
+	schemaLookup      map[string][]string
+	tableLookup       map[catalogAndSchema][]tableInfo
+	catalogPattern    *regexp.Regexp
+	columnNamePattern *regexp.Regexp
+
+	catalogNameBuilder           *array.StringBuilder
+	catalogDbSchemasBuilder      *array.ListBuilder
+	catalogDbSchemasItems        *array.StructBuilder
+	dbSchemaNameBuilder          *array.StringBuilder
+	dbSchemaTablesBuilder        *array.ListBuilder
+	dbSchemaTablesItems          *array.StructBuilder
+	tableNameBuilder             *array.StringBuilder
+	tableTypeBuilder             *array.StringBuilder
+	tableColumnsBuilder          *array.ListBuilder
+	tableColumnsItems            *array.StructBuilder
+	columnNameBuilder            *array.StringBuilder
+	ordinalPositionBuilder       *array.Int32Builder
+	remarksBuilder               *array.StringBuilder
+	xdbcDataTypeBuilder          *array.Int16Builder
+	xdbcTypeNameBuilder          *array.StringBuilder
+	xdbcColumnSizeBuilder        *array.Int32Builder
+	xdbcDecimalDigitsBuilder     *array.Int16Builder
+	xdbcNumPrecRadixBuilder      *array.Int16Builder
+	xdbcNullableBuilder          *array.Int16Builder
+	xdbcColumnDefBuilder         *array.StringBuilder
+	xdbcSqlDataTypeBuilder       *array.Int16Builder
+	xdbcDatetimeSubBuilder       *array.Int16Builder
+	xdbcCharOctetLengthBuilder   *array.Int32Builder
+	xdbcIsNullableBuilder        *array.StringBuilder
+	xdbcScopeCatalogBuilder      *array.StringBuilder
+	xdbcScopeSchemaBuilder       *array.StringBuilder
+	xdbcScopeTableBuilder        *array.StringBuilder
+	xdbcIsAutoincrementBuilder   *array.BooleanBuilder
+	xdbcIsGeneratedcolumnBuilder *array.BooleanBuilder
+	tableConstraintsBuilder      *array.ListBuilder
+}
+
+func (g *getObjects) init(c *cnxn) error {
+	if catalogToDbSchemas, err := c.getObjectsDbSchemas(g.ctx, g.depth, g.catalog, g.dbSchema); err != nil {
+		return err
+	} else {
+		g.schemaLookup = catalogToDbSchemas
+	}
+
+	if tableLookup, err := c.getObjectsTables(g.ctx, g.depth, g.catalog, g.dbSchema, g.tableName, g.columnName, g.tableType); err != nil {
+		return err
+	} else {
+		g.tableLookup = tableLookup
+	}
+
+	if catalogPattern, err := patternToRegexp(g.catalog); err != nil {
+		return adbc.Error{
+			Msg:  err.Error(),
+			Code: adbc.StatusInvalidArgument,
+		}
+	} else {
+		g.catalogPattern = catalogPattern
+	}
+	if columnNamePattern, err := patternToRegexp(g.columnName); err != nil {
+		return adbc.Error{
+			Msg:  err.Error(),
+			Code: adbc.StatusInvalidArgument,
+		}
+	} else {
+		g.columnNamePattern = columnNamePattern
+	}
+
+	g.builder = array.NewRecordBuilder(c.db.alloc, adbc.GetObjectsSchema)
+	g.catalogNameBuilder = g.builder.Field(0).(*array.StringBuilder)
+	g.catalogDbSchemasBuilder = g.builder.Field(1).(*array.ListBuilder)
+	g.catalogDbSchemasItems = g.catalogDbSchemasBuilder.ValueBuilder().(*array.StructBuilder)
+	g.dbSchemaNameBuilder = g.catalogDbSchemasItems.FieldBuilder(0).(*array.StringBuilder)
+	g.dbSchemaTablesBuilder = g.catalogDbSchemasItems.FieldBuilder(1).(*array.ListBuilder)
+	g.dbSchemaTablesItems = g.dbSchemaTablesBuilder.ValueBuilder().(*array.StructBuilder)
+	g.tableNameBuilder = g.dbSchemaTablesItems.FieldBuilder(0).(*array.StringBuilder)
+	g.tableTypeBuilder = g.dbSchemaTablesItems.FieldBuilder(1).(*array.StringBuilder)
+	g.tableColumnsBuilder = g.dbSchemaTablesItems.FieldBuilder(2).(*array.ListBuilder)
+	g.tableColumnsItems = g.tableColumnsBuilder.ValueBuilder().(*array.StructBuilder)
+	g.columnNameBuilder = g.tableColumnsItems.FieldBuilder(0).(*array.StringBuilder)
+	g.ordinalPositionBuilder = g.tableColumnsItems.FieldBuilder(1).(*array.Int32Builder)
+	g.remarksBuilder = g.tableColumnsItems.FieldBuilder(2).(*array.StringBuilder)
+	g.xdbcDataTypeBuilder = g.tableColumnsItems.FieldBuilder(3).(*array.Int16Builder)
+	g.xdbcTypeNameBuilder = g.tableColumnsItems.FieldBuilder(4).(*array.StringBuilder)
+	g.xdbcColumnSizeBuilder = g.tableColumnsItems.FieldBuilder(5).(*array.Int32Builder)
+	g.xdbcDecimalDigitsBuilder = g.tableColumnsItems.FieldBuilder(6).(*array.Int16Builder)
+	g.xdbcNumPrecRadixBuilder = g.tableColumnsItems.FieldBuilder(7).(*array.Int16Builder)
+	g.xdbcNullableBuilder = g.tableColumnsItems.FieldBuilder(8).(*array.Int16Builder)
+	g.xdbcColumnDefBuilder = g.tableColumnsItems.FieldBuilder(9).(*array.StringBuilder)
+	g.xdbcSqlDataTypeBuilder = g.tableColumnsItems.FieldBuilder(10).(*array.Int16Builder)
+	g.xdbcDatetimeSubBuilder = g.tableColumnsItems.FieldBuilder(11).(*array.Int16Builder)
+	g.xdbcCharOctetLengthBuilder = g.tableColumnsItems.FieldBuilder(12).(*array.Int32Builder)
+	g.xdbcIsNullableBuilder = g.tableColumnsItems.FieldBuilder(13).(*array.StringBuilder)
+	g.xdbcScopeCatalogBuilder = g.tableColumnsItems.FieldBuilder(14).(*array.StringBuilder)
+	g.xdbcScopeSchemaBuilder = g.tableColumnsItems.FieldBuilder(15).(*array.StringBuilder)
+	g.xdbcScopeTableBuilder = g.tableColumnsItems.FieldBuilder(16).(*array.StringBuilder)
+	g.xdbcIsAutoincrementBuilder = g.tableColumnsItems.FieldBuilder(17).(*array.BooleanBuilder)
+	g.xdbcIsGeneratedcolumnBuilder = g.tableColumnsItems.FieldBuilder(18).(*array.BooleanBuilder)
+	g.tableConstraintsBuilder = g.dbSchemaTablesItems.FieldBuilder(3).(*array.ListBuilder)
+
+	return nil
+}
+
+func (g *getObjects) release() {
+	g.builder.Release()
+}
+
+func (g *getObjects) finish() (array.RecordReader, error) {
+	record := g.builder.NewRecord()
+	result, err := array.NewRecordReader(g.builder.Schema(), []arrow.Record{record})
+	if err != nil {
+		return nil, adbc.Error{
+			Msg:  err.Error(),
+			Code: adbc.StatusInternal,
+		}
+	}
+	return result, nil
+}
+
+func (g *getObjects) appendCatalog(catalogName string) {
+	if g.catalogPattern != nil && !g.catalogPattern.MatchString(catalogName) {
+		return
+	}
+	g.catalogNameBuilder.Append(catalogName)
+
+	if g.depth == adbc.ObjectDepthCatalogs {
+		g.catalogDbSchemasBuilder.AppendNull()
+		return
+	}
+	g.catalogDbSchemasBuilder.Append(true)
+
+	for _, dbSchemaName := range g.schemaLookup[catalogName] {
+		g.appendDbSchema(catalogName, dbSchemaName)
+	}
+}
+
+func (g *getObjects) appendDbSchema(catalogName, dbSchemaName string) {
+	g.dbSchemaNameBuilder.Append(dbSchemaName)
+	g.catalogDbSchemasItems.Append(true)
+
+	if g.depth == adbc.ObjectDepthDBSchemas {
+		g.dbSchemaTablesBuilder.AppendNull()
+		return
+	}
+	g.dbSchemaTablesBuilder.Append(true)
+
+	for _, tableInfo := range g.tableLookup[catalogAndSchema{
+		catalog: catalogName,
+		schema:  dbSchemaName,
+	}] {
+		g.appendTableInfo(tableInfo)
+	}
+}
+
+func (g *getObjects) appendTableInfo(tableInfo tableInfo) {
+	g.tableNameBuilder.Append(tableInfo.name)
+	g.tableTypeBuilder.Append(tableInfo.tableType)
+	g.dbSchemaTablesItems.Append(true)
+
+	if g.depth == adbc.ObjectDepthTables {
+		g.tableColumnsBuilder.AppendNull()
+		g.tableConstraintsBuilder.AppendNull()
+		return
+	}
+	g.tableColumnsBuilder.Append(true)
+	// TODO: unimplemented for now
+	g.tableConstraintsBuilder.Append(true)
+
+	for colIndex, column := range tableInfo.schema.Fields() {
+		if g.columnNamePattern != nil && !g.columnNamePattern.MatchString(column.Name) {
+			continue
+		}
+		g.columnNameBuilder.Append(column.Name)
+		g.ordinalPositionBuilder.Append(int32(colIndex + 1))
+		g.remarksBuilder.AppendNull()
+		g.xdbcDataTypeBuilder.AppendNull()
+		g.xdbcTypeNameBuilder.AppendNull()
+		g.xdbcColumnSizeBuilder.AppendNull()
+		g.xdbcDecimalDigitsBuilder.AppendNull()
+		g.xdbcNumPrecRadixBuilder.AppendNull()
+		g.xdbcNullableBuilder.AppendNull()
+		g.xdbcColumnDefBuilder.AppendNull()
+		g.xdbcSqlDataTypeBuilder.AppendNull()
+		g.xdbcDatetimeSubBuilder.AppendNull()
+		g.xdbcCharOctetLengthBuilder.AppendNull()
+		g.xdbcIsNullableBuilder.AppendNull()
+		g.xdbcScopeCatalogBuilder.AppendNull()
+		g.xdbcScopeSchemaBuilder.AppendNull()
+		g.xdbcScopeTableBuilder.AppendNull()
+		g.xdbcIsAutoincrementBuilder.AppendNull()
+		g.xdbcIsGeneratedcolumnBuilder.AppendNull()
+
+		g.tableColumnsItems.Append(true)
+	}
+}
+
+// Helper function to read and validate a metadata stream
+func (c *cnxn) readInfo(ctx context.Context, expectedSchema *arrow.Schema, info *flight.FlightInfo) (array.RecordReader, error) {
+	// use a default queueSize for the reader
+	rdr, err := newRecordReader(ctx, c.db.alloc, c.cl, info, c.clientCache, 5)
+	if err != nil {
+		return nil, adbcFromFlightStatus(err)
+	}
+
+	if !rdr.Schema().Equal(expectedSchema) {
+		rdr.Release()
+		return nil, adbc.Error{
+			Msg:  fmt.Sprintf("Invalid schema returned for: expected %s, got %s", expectedSchema.String(), rdr.Schema().String()),
+			Code: adbc.StatusInternal,
+		}
+	}
+	return rdr, nil
+}
+
+// Helper function that compiles a SQL-style pattern (%, _) to a regex
+func patternToRegexp(pattern *string) (*regexp.Regexp, error) {
+	if pattern == nil {
+		return nil, nil
+	}
+
+	var builder strings.Builder
+	if _, err := builder.WriteString("^"); err != nil {
+		return nil, err
+	}
+	for _, c := range *pattern {
+		switch {
+		case c == rune('_'):
+			if _, err := builder.WriteString("."); err != nil {
+				return nil, err
+			}
+		case c == rune('%'):
+			if _, err := builder.WriteString(".*"); err != nil {
+				return nil, err
+			}
+		default:
+			if _, err := builder.WriteString(regexp.QuoteMeta(string([]rune{c}))); err != nil {
+				return nil, err
+			}
+		}
+	}
+	if _, err := builder.WriteString("$"); err != nil {
+		return nil, err
+	}
+	return regexp.Compile(builder.String())
+}
+
+// Helper function to build up a map of catalogs to DB schemas
+func (c *cnxn) getObjectsDbSchemas(ctx context.Context, depth adbc.ObjectDepth, catalog *string, dbSchema *string) (result map[string][]string, err error) {
+	if depth == adbc.ObjectDepthCatalogs {
+		return
+	}
+	result = make(map[string][]string)
+	// Pre-populate the map of which schemas are in which catalogs
+	info, err := c.cl.GetDBSchemas(ctx, &flightsql.GetDBSchemasOpts{DbSchemaFilterPattern: dbSchema})
+	if err != nil {
+		return nil, adbcFromFlightStatus(err)
+	}
+
+	rdr, err := c.readInfo(ctx, schema_ref.DBSchemas, info)
+	if err != nil {
+		return nil, adbcFromFlightStatus(err)
+	}
+	defer rdr.Release()
+
+	for rdr.Next() {
+		// Nullable
+		catalog := rdr.Record().Column(0).(*array.String)
+		// Non-nullable
+		dbSchema := rdr.Record().Column(1).(*array.String)
+
+		for i := 0; i < catalog.Len(); i++ {
+			catalogName := ""
+			if !catalog.IsNull(i) {
+				catalogName = catalog.Value(i)
+			}
+			result[catalogName] = append(result[catalogName], dbSchema.Value(i))
+		}
+	}
+
+	if rdr.Err() != nil {
+		result = nil
+		err = adbcFromFlightStatus(rdr.Err())
+	}
+	return
+}
+
+type catalogAndSchema struct {
+	catalog, schema string
+}
+
+type tableInfo struct {
+	name, tableType string
+	schema          *arrow.Schema
+}
+
+func (c *cnxn) getObjectsTables(ctx context.Context, depth adbc.ObjectDepth, catalog *string, dbSchema *string, tableName *string, columnName *string, tableType []string) (result map[catalogAndSchema][]tableInfo, err error) {
+	if depth == adbc.ObjectDepthCatalogs || depth == adbc.ObjectDepthDBSchemas {
+		return
+	}
+	result = make(map[catalogAndSchema][]tableInfo)
+
+	// Pre-populate the map of which schemas are in which catalogs
+	includeSchema := depth == adbc.ObjectDepthAll || depth == adbc.ObjectDepthColumns
+	info, err := c.cl.GetTables(ctx, &flightsql.GetTablesOpts{
+		TableNameFilterPattern: tableName,
+		TableTypes:             tableType,
+		IncludeSchema:          includeSchema,
+	})
+	if err != nil {
+		return nil, adbcFromFlightStatus(err)
+	}
+
+	expectedSchema := schema_ref.Tables
+	if includeSchema {
+		expectedSchema = schema_ref.TablesWithIncludedSchema
+	}
+	rdr, err := c.readInfo(ctx, expectedSchema, info)
+	if err != nil {
+		return nil, adbcFromFlightStatus(err)
+	}
+	defer rdr.Release()
+
+	for rdr.Next() {
+		// Nullable
+		catalog := rdr.Record().Column(0).(*array.String)
+		dbSchema := rdr.Record().Column(1).(*array.String)
+		// Non-nullable
+		tableName := rdr.Record().Column(2).(*array.String)
+		tableType := rdr.Record().Column(3).(*array.String)
+
+		for i := 0; i < catalog.Len(); i++ {
+			catalogName := ""
+			dbSchemaName := ""
+			if !catalog.IsNull(i) {
+				catalogName = catalog.Value(i)
+			}
+			if !dbSchema.IsNull(i) {
+				dbSchemaName = dbSchema.Value(i)
+			}
+			key := catalogAndSchema{
+				catalog: catalogName,
+				schema:  dbSchemaName,
+			}
+
+			var schema *arrow.Schema
+			if includeSchema {
+				reader, err := ipc.NewReader(bytes.NewReader(rdr.Record().Column(4).(*array.Binary).Value(i)))
+				if err != nil {
+					return nil, adbc.Error{
+						Msg:  err.Error(),
+						Code: adbc.StatusInternal,
+					}
+				}
+				schema = reader.Schema()
+				reader.Release()
+			}
+
+			result[key] = append(result[key], tableInfo{
+				name:      tableName.Value(i),
+				tableType: tableType.Value(i),
+				schema:    schema,
+			})
+		}
+	}
+
+	if rdr.Err() != nil {
+		result = nil
+		err = adbcFromFlightStatus(rdr.Err())
 	}
+	return
 }
 
 func (c *cnxn) GetTableSchema(ctx context.Context, catalog *string, dbSchema *string, tableName string) (*arrow.Schema, error) {