You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2023/06/21 16:41:21 UTC
[arrow-adbc] branch main updated: fix(go/adbc/driver/snowflake): fix potential deadlock and error handling (#828)
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new f35485a5 fix(go/adbc/driver/snowflake): fix potential deadlock and error handling (#828)
f35485a5 is described below
commit f35485a5f3c9597668c0b4a8936621c97c4adc15
Author: Matt Topol <zo...@gmail.com>
AuthorDate: Wed Jun 21 12:41:17 2023 -0400
fix(go/adbc/driver/snowflake): fix potential deadlock and error handling (#828)
Found these when trying to do some performance testing.
---
go/adbc/driver/flightsql/record_reader.go | 4 +++
go/adbc/driver/snowflake/driver.go | 5 ++-
go/adbc/driver/snowflake/record_reader.go | 59 +++++++++++++++++--------------
3 files changed, 38 insertions(+), 30 deletions(-)
diff --git a/go/adbc/driver/flightsql/record_reader.go b/go/adbc/driver/flightsql/record_reader.go
index 6bcb7791..409ce58e 100644
--- a/go/adbc/driver/flightsql/record_reader.go
+++ b/go/adbc/driver/flightsql/record_reader.go
@@ -95,6 +95,10 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, cl *flightsql.
schema = rdr.Schema()
group.Go(func() error {
defer rdr.Release()
+ if numEndpoints > 1 {
+ defer close(ch)
+ }
+
for rdr.Next() && ctx.Err() == nil {
rec := rdr.Record()
rec.Retain()
diff --git a/go/adbc/driver/snowflake/driver.go b/go/adbc/driver/snowflake/driver.go
index bd8b2c01..c02b58dd 100644
--- a/go/adbc/driver/snowflake/driver.go
+++ b/go/adbc/driver/snowflake/driver.go
@@ -161,9 +161,8 @@ func errToAdbcErr(code adbc.Status, err error) error {
var sferr *gosnowflake.SnowflakeError
if errors.As(err, &sferr) {
var sqlstate [5]byte
- if len(sferr.SQLState) > 0 {
- copy(sqlstate[:], sferr.SQLState[:5])
- }
+ copy(sqlstate[:], []byte(sferr.SQLState))
+
return adbc.Error{
Code: code,
Msg: sferr.Error(),
diff --git a/go/adbc/driver/snowflake/record_reader.go b/go/adbc/driver/snowflake/record_reader.go
index b3041ddd..32169bb7 100644
--- a/go/adbc/driver/snowflake/record_reader.go
+++ b/go/adbc/driver/snowflake/record_reader.go
@@ -274,6 +274,9 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake
group.Go(func() error {
defer rr.Release()
defer r.Close()
+ if len(batches) > 1 {
+ defer close(ch)
+ }
for rr.Next() && ctx.Err() == nil {
rec := rr.Record()
@@ -297,39 +300,41 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake
}
lastChannelIndex := len(chs) - 1
- for i, b := range batches[1:] {
- batch, batchIdx := b, i+1
- chs[batchIdx] = make(chan arrow.Record, bufferSize)
- group.Go(func() error {
- // close channels (except the last) so that Next can move on to the next channel properly
- if batchIdx != lastChannelIndex {
- defer close(chs[batchIdx])
- }
-
- rdr, err := batch.GetStream(ctx)
- if err != nil {
- return err
- }
- defer rdr.Close()
+ go func() {
+ for i, b := range batches[1:] {
+ batch, batchIdx := b, i+1
+ chs[batchIdx] = make(chan arrow.Record, bufferSize)
+ group.Go(func() error {
+ // close channels (except the last) so that Next can move on to the next channel properly
+ if batchIdx != lastChannelIndex {
+ defer close(chs[batchIdx])
+ }
- rr, err := ipc.NewReader(rdr, ipc.WithAllocator(alloc))
- if err != nil {
- return err
- }
- defer rr.Release()
+ rdr, err := batch.GetStream(ctx)
+ if err != nil {
+ return err
+ }
+ defer rdr.Close()
- for rr.Next() && ctx.Err() == nil {
- rec := rr.Record()
- rec, err = recTransform(ctx, rec)
+ rr, err := ipc.NewReader(rdr, ipc.WithAllocator(alloc))
if err != nil {
return err
}
- chs[batchIdx] <- rec
- }
+ defer rr.Release()
- return rr.Err()
- })
- }
+ for rr.Next() && ctx.Err() == nil {
+ rec := rr.Record()
+ rec, err = recTransform(ctx, rec)
+ if err != nil {
+ return err
+ }
+ chs[batchIdx] <- rec
+ }
+
+ return rr.Err()
+ })
+ }
+ }()
go func() {
rdr.err = group.Wait()