You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2023/01/25 23:05:21 UTC

[arrow-adbc] branch main updated: test(go/adbc/driver/flightsql): add tests for DoGet fallback (#389)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new f06089b  test(go/adbc/driver/flightsql): add tests for DoGet fallback (#389)
f06089b is described below

commit f06089b0fb20863cc63dab5963b2a56387cf0b49
Author: David Li <li...@gmail.com>
AuthorDate: Wed Jan 25 18:05:15 2023 -0500

    test(go/adbc/driver/flightsql): add tests for DoGet fallback (#389)
---
 go/adbc/driver/flightsql/record_reader_test.go | 99 +++++++++++++++++++++++++-
 1 file changed, 96 insertions(+), 3 deletions(-)

diff --git a/go/adbc/driver/flightsql/record_reader_test.go b/go/adbc/driver/flightsql/record_reader_test.go
index 2ab1fff..670d943 100644
--- a/go/adbc/driver/flightsql/record_reader_test.go
+++ b/go/adbc/driver/flightsql/record_reader_test.go
@@ -19,6 +19,7 @@ package flightsql
 
 import (
 	"context"
+	"fmt"
 	"net/url"
 	"testing"
 
@@ -44,10 +45,17 @@ func orderingSchema() *arrow.Schema {
 
 type testFlightService struct {
 	flight.BaseFlightServer
-	alloc memory.Allocator
+	alloc        memory.Allocator
+	failureCount int
 }
 
 func (f *testFlightService) DoGet(request *flight.Ticket, stream flight.FlightService_DoGetServer) error {
+	// Crude way to make requests fail until retried enough times
+	if f.failureCount > 0 {
+		f.failureCount--
+		return fmt.Errorf("Failed request")
+	}
+
 	schema := orderingSchema()
 	wr := flight.NewRecordWriter(stream, ipc.WithSchema(schema))
 	defer wr.Close()
@@ -86,6 +94,7 @@ type RecordReaderTests struct {
 
 	alloc   *memory.CheckedAllocator
 	server  flight.Server
+	service *testFlightService
 	cl      *flightsql.Client
 	clCache gcache.Cache
 }
@@ -95,8 +104,8 @@ func (suite *RecordReaderTests) SetupSuite() {
 
 	suite.server = flight.NewServerWithMiddleware(nil)
 	suite.NoError(suite.server.Init("localhost:0"))
-	svc := &testFlightService{alloc: suite.alloc}
-	suite.server.RegisterFlightService(svc)
+	suite.service = &testFlightService{alloc: suite.alloc}
+	suite.server.RegisterFlightService(suite.service)
 
 	go func() {
 		// Explicitly ignore error
@@ -135,6 +144,90 @@ func (suite *RecordReaderTests) TearDownSuite() {
 	suite.alloc.AssertSize(suite.T(), 0)
 }
 
+func (suite *RecordReaderTests) TestFallbackFailedConnection() {
+	goodLocation := "grpc://" + suite.server.Addr().String()
+	badLocation := "grpc://127.0.0.2:1234"
+	info := flight.FlightInfo{
+		Schema: flight.SerializeSchema(orderingSchema(), suite.alloc),
+		Endpoint: []*flight.FlightEndpoint{
+			{
+				Ticket:   &flight.Ticket{Ticket: []byte{0}},
+				Location: []*flight.Location{{Uri: badLocation}, {Uri: goodLocation}},
+			},
+		},
+	}
+
+	reader, err := newRecordReader(context.Background(), suite.alloc, suite.cl, &info, suite.clCache, 3)
+	suite.NoError(err)
+	defer reader.Release()
+
+	suite.True(reader.Schema().Equal(orderingSchema()))
+	suite.True(reader.Next())
+	suite.True(reader.Next())
+	suite.True(reader.Next())
+	suite.True(reader.Next())
+	suite.False(reader.Next())
+	suite.NoError(reader.Err())
+}
+
+func (suite *RecordReaderTests) TestFallbackFailedDoGet() {
+	defer func() {
+		suite.service.failureCount = 0
+	}()
+
+	suite.service.failureCount = 2
+	goodLocation := "grpc://" + suite.server.Addr().String()
+	info := flight.FlightInfo{
+		Schema: flight.SerializeSchema(orderingSchema(), suite.alloc),
+		Endpoint: []*flight.FlightEndpoint{
+			{
+				Ticket:   &flight.Ticket{Ticket: []byte{0}},
+				Location: []*flight.Location{{Uri: goodLocation}, {Uri: goodLocation}, {Uri: goodLocation}},
+			},
+		},
+	}
+
+	reader, err := newRecordReader(context.Background(), suite.alloc, suite.cl, &info, suite.clCache, 3)
+	suite.NoError(err)
+	defer reader.Release()
+
+	suite.True(reader.Schema().Equal(orderingSchema()))
+	suite.True(reader.Next())
+	suite.True(reader.Next())
+	suite.True(reader.Next())
+	suite.True(reader.Next())
+	suite.False(reader.Next())
+	suite.NoError(reader.Err())
+
+	// Not enough retries
+	suite.service.failureCount = 4
+	reader, err = newRecordReader(context.Background(), suite.alloc, suite.cl, &info, suite.clCache, 3)
+	suite.NoError(err)
+	defer reader.Release()
+	suite.False(reader.Next())
+	suite.Error(reader.Err())
+}
+
+func (suite *RecordReaderTests) TestFallbackFailed() {
+	badLocation := "grpc://127.0.0.2:1234"
+	info := flight.FlightInfo{
+		Schema: flight.SerializeSchema(orderingSchema(), suite.alloc),
+		Endpoint: []*flight.FlightEndpoint{
+			{
+				Ticket:   &flight.Ticket{Ticket: []byte{0}},
+				Location: []*flight.Location{{Uri: badLocation}, {Uri: badLocation}},
+			},
+		},
+	}
+
+	reader, err := newRecordReader(context.Background(), suite.alloc, suite.cl, &info, suite.clCache, 3)
+	suite.NoError(err)
+	defer reader.Release()
+
+	suite.False(reader.Next())
+	suite.Error(reader.Err())
+}
+
 func (suite *RecordReaderTests) TestNoEndpoints() {
 	info := flight.FlightInfo{
 		Schema: flight.SerializeSchema(orderingSchema(), suite.alloc),