You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2023/06/21 16:41:21 UTC

[arrow-adbc] branch main updated: fix(go/adbc/driver/snowflake): fix potential deadlock and error handling (#828)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new f35485a5 fix(go/adbc/driver/snowflake): fix potential deadlock and error handling (#828)
f35485a5 is described below

commit f35485a5f3c9597668c0b4a8936621c97c4adc15
Author: Matt Topol <zo...@gmail.com>
AuthorDate: Wed Jun 21 12:41:17 2023 -0400

    fix(go/adbc/driver/snowflake): fix potential deadlock and error handling (#828)
    
    Found these when trying to do some performance testing.
---
 go/adbc/driver/flightsql/record_reader.go |  4 +++
 go/adbc/driver/snowflake/driver.go        |  5 ++-
 go/adbc/driver/snowflake/record_reader.go | 59 +++++++++++++++++--------------
 3 files changed, 38 insertions(+), 30 deletions(-)

diff --git a/go/adbc/driver/flightsql/record_reader.go b/go/adbc/driver/flightsql/record_reader.go
index 6bcb7791..409ce58e 100644
--- a/go/adbc/driver/flightsql/record_reader.go
+++ b/go/adbc/driver/flightsql/record_reader.go
@@ -95,6 +95,10 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, cl *flightsql.
 		schema = rdr.Schema()
 		group.Go(func() error {
 			defer rdr.Release()
+			if numEndpoints > 1 {
+				defer close(ch)
+			}
+
 			for rdr.Next() && ctx.Err() == nil {
 				rec := rdr.Record()
 				rec.Retain()
diff --git a/go/adbc/driver/snowflake/driver.go b/go/adbc/driver/snowflake/driver.go
index bd8b2c01..c02b58dd 100644
--- a/go/adbc/driver/snowflake/driver.go
+++ b/go/adbc/driver/snowflake/driver.go
@@ -161,9 +161,8 @@ func errToAdbcErr(code adbc.Status, err error) error {
 	var sferr *gosnowflake.SnowflakeError
 	if errors.As(err, &sferr) {
 		var sqlstate [5]byte
-		if len(sferr.SQLState) > 0 {
-			copy(sqlstate[:], sferr.SQLState[:5])
-		}
+		copy(sqlstate[:], []byte(sferr.SQLState))
+
 		return adbc.Error{
 			Code:       code,
 			Msg:        sferr.Error(),
diff --git a/go/adbc/driver/snowflake/record_reader.go b/go/adbc/driver/snowflake/record_reader.go
index b3041ddd..32169bb7 100644
--- a/go/adbc/driver/snowflake/record_reader.go
+++ b/go/adbc/driver/snowflake/record_reader.go
@@ -274,6 +274,9 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake
 	group.Go(func() error {
 		defer rr.Release()
 		defer r.Close()
+		if len(batches) > 1 {
+			defer close(ch)
+		}
 
 		for rr.Next() && ctx.Err() == nil {
 			rec := rr.Record()
@@ -297,39 +300,41 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake
 	}
 
 	lastChannelIndex := len(chs) - 1
-	for i, b := range batches[1:] {
-		batch, batchIdx := b, i+1
-		chs[batchIdx] = make(chan arrow.Record, bufferSize)
-		group.Go(func() error {
-			// close channels (except the last) so that Next can move on to the next channel properly
-			if batchIdx != lastChannelIndex {
-				defer close(chs[batchIdx])
-			}
-
-			rdr, err := batch.GetStream(ctx)
-			if err != nil {
-				return err
-			}
-			defer rdr.Close()
+	go func() {
+		for i, b := range batches[1:] {
+			batch, batchIdx := b, i+1
+			chs[batchIdx] = make(chan arrow.Record, bufferSize)
+			group.Go(func() error {
+				// close channels (except the last) so that Next can move on to the next channel properly
+				if batchIdx != lastChannelIndex {
+					defer close(chs[batchIdx])
+				}
 
-			rr, err := ipc.NewReader(rdr, ipc.WithAllocator(alloc))
-			if err != nil {
-				return err
-			}
-			defer rr.Release()
+				rdr, err := batch.GetStream(ctx)
+				if err != nil {
+					return err
+				}
+				defer rdr.Close()
 
-			for rr.Next() && ctx.Err() == nil {
-				rec := rr.Record()
-				rec, err = recTransform(ctx, rec)
+				rr, err := ipc.NewReader(rdr, ipc.WithAllocator(alloc))
 				if err != nil {
 					return err
 				}
-				chs[batchIdx] <- rec
-			}
+				defer rr.Release()
 
-			return rr.Err()
-		})
-	}
+				for rr.Next() && ctx.Err() == nil {
+					rec := rr.Record()
+					rec, err = recTransform(ctx, rec)
+					if err != nil {
+						return err
+					}
+					chs[batchIdx] <- rec
+				}
+
+				return rr.Err()
+			})
+		}
+	}()
 
 	go func() {
 		rdr.err = group.Wait()