You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2023/01/25 23:05:21 UTC
[arrow-adbc] branch main updated: test(go/adbc/driver/flightsql): add tests for DoGet fallback (#389)
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new f06089b test(go/adbc/driver/flightsql): add tests for DoGet fallback (#389)
f06089b is described below
commit f06089b0fb20863cc63dab5963b2a56387cf0b49
Author: David Li <li...@gmail.com>
AuthorDate: Wed Jan 25 18:05:15 2023 -0500
test(go/adbc/driver/flightsql): add tests for DoGet fallback (#389)
---
go/adbc/driver/flightsql/record_reader_test.go | 99 +++++++++++++++++++++++++-
1 file changed, 96 insertions(+), 3 deletions(-)
diff --git a/go/adbc/driver/flightsql/record_reader_test.go b/go/adbc/driver/flightsql/record_reader_test.go
index 2ab1fff..670d943 100644
--- a/go/adbc/driver/flightsql/record_reader_test.go
+++ b/go/adbc/driver/flightsql/record_reader_test.go
@@ -19,6 +19,7 @@ package flightsql
import (
"context"
+ "fmt"
"net/url"
"testing"
@@ -44,10 +45,17 @@ func orderingSchema() *arrow.Schema {
type testFlightService struct {
flight.BaseFlightServer
- alloc memory.Allocator
+ alloc memory.Allocator
+ failureCount int
}
func (f *testFlightService) DoGet(request *flight.Ticket, stream flight.FlightService_DoGetServer) error {
+ // Crude way to make requests fail until retried enough times
+ if f.failureCount > 0 {
+ f.failureCount--
+ return fmt.Errorf("Failed request")
+ }
+
schema := orderingSchema()
wr := flight.NewRecordWriter(stream, ipc.WithSchema(schema))
defer wr.Close()
@@ -86,6 +94,7 @@ type RecordReaderTests struct {
alloc *memory.CheckedAllocator
server flight.Server
+ service *testFlightService
cl *flightsql.Client
clCache gcache.Cache
}
@@ -95,8 +104,8 @@ func (suite *RecordReaderTests) SetupSuite() {
suite.server = flight.NewServerWithMiddleware(nil)
suite.NoError(suite.server.Init("localhost:0"))
- svc := &testFlightService{alloc: suite.alloc}
- suite.server.RegisterFlightService(svc)
+ suite.service = &testFlightService{alloc: suite.alloc}
+ suite.server.RegisterFlightService(suite.service)
go func() {
// Explicitly ignore error
@@ -135,6 +144,90 @@ func (suite *RecordReaderTests) TearDownSuite() {
suite.alloc.AssertSize(suite.T(), 0)
}
+func (suite *RecordReaderTests) TestFallbackFailedConnection() {
+ goodLocation := "grpc://" + suite.server.Addr().String()
+ badLocation := "grpc://127.0.0.2:1234"
+ info := flight.FlightInfo{
+ Schema: flight.SerializeSchema(orderingSchema(), suite.alloc),
+ Endpoint: []*flight.FlightEndpoint{
+ {
+ Ticket: &flight.Ticket{Ticket: []byte{0}},
+ Location: []*flight.Location{{Uri: badLocation}, {Uri: goodLocation}},
+ },
+ },
+ }
+
+ reader, err := newRecordReader(context.Background(), suite.alloc, suite.cl, &info, suite.clCache, 3)
+ suite.NoError(err)
+ defer reader.Release()
+
+ suite.True(reader.Schema().Equal(orderingSchema()))
+ suite.True(reader.Next())
+ suite.True(reader.Next())
+ suite.True(reader.Next())
+ suite.True(reader.Next())
+ suite.False(reader.Next())
+ suite.NoError(reader.Err())
+}
+
+func (suite *RecordReaderTests) TestFallbackFailedDoGet() {
+ defer func() {
+ suite.service.failureCount = 0
+ }()
+
+ suite.service.failureCount = 2
+ goodLocation := "grpc://" + suite.server.Addr().String()
+ info := flight.FlightInfo{
+ Schema: flight.SerializeSchema(orderingSchema(), suite.alloc),
+ Endpoint: []*flight.FlightEndpoint{
+ {
+ Ticket: &flight.Ticket{Ticket: []byte{0}},
+ Location: []*flight.Location{{Uri: goodLocation}, {Uri: goodLocation}, {Uri: goodLocation}},
+ },
+ },
+ }
+
+ reader, err := newRecordReader(context.Background(), suite.alloc, suite.cl, &info, suite.clCache, 3)
+ suite.NoError(err)
+ defer reader.Release()
+
+ suite.True(reader.Schema().Equal(orderingSchema()))
+ suite.True(reader.Next())
+ suite.True(reader.Next())
+ suite.True(reader.Next())
+ suite.True(reader.Next())
+ suite.False(reader.Next())
+ suite.NoError(reader.Err())
+
+ // Not enough retries
+ suite.service.failureCount = 4
+ reader, err = newRecordReader(context.Background(), suite.alloc, suite.cl, &info, suite.clCache, 3)
+ suite.NoError(err)
+ defer reader.Release()
+ suite.False(reader.Next())
+ suite.Error(reader.Err())
+}
+
+func (suite *RecordReaderTests) TestFallbackFailed() {
+ badLocation := "grpc://127.0.0.2:1234"
+ info := flight.FlightInfo{
+ Schema: flight.SerializeSchema(orderingSchema(), suite.alloc),
+ Endpoint: []*flight.FlightEndpoint{
+ {
+ Ticket: &flight.Ticket{Ticket: []byte{0}},
+ Location: []*flight.Location{{Uri: badLocation}, {Uri: badLocation}},
+ },
+ },
+ }
+
+ reader, err := newRecordReader(context.Background(), suite.alloc, suite.cl, &info, suite.clCache, 3)
+ suite.NoError(err)
+ defer reader.Release()
+
+ suite.False(reader.Next())
+ suite.Error(reader.Err())
+}
+
func (suite *RecordReaderTests) TestNoEndpoints() {
info := flight.FlightInfo{
Schema: flight.SerializeSchema(orderingSchema(), suite.alloc),