You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "zeroshade (via GitHub)" <gi...@apache.org> on 2023/07/31 21:17:10 UTC

[GitHub] [arrow] zeroshade commented on a diff in pull request #36946: GH-36155: [C++][Go][FlightRPC] Add support for long-running queries

zeroshade commented on code in PR #36946:
URL: https://github.com/apache/arrow/pull/36946#discussion_r1279884912


##########
go/arrow/internal/flight_integration/scenario.go:
##########
@@ -1131,6 +1134,112 @@ func (tester *expirationTimeRenewFlightEndpointScenarioTester) RunClient(addr st
 	return nil
 }
 
+type pollFlightInfoScenarioTester struct {
+	flight.BaseFlightServer
+}
+
+func (tester *pollFlightInfoScenarioTester) MakeServer(port int) flight.Server {
+	srv := flight.NewServerWithMiddleware(nil)
+	srv.RegisterFlightService(tester)
+	initServer(port, srv)
+	return srv
+}
+
+func (tester *pollFlightInfoScenarioTester) PollFlightInfo(ctx context.Context, desc *flight.FlightDescriptor) (*flight.RetryInfo, error) {
+	schema := arrow.NewSchema(
+		[]arrow.Field{
+			{Name: "number", Type: arrow.PrimitiveTypes.Uint32},
+		},
+		nil,
+	)
+	endpoints := []*flight.FlightEndpoint{
+		&flight.FlightEndpoint{
+			Ticket:   &flight.Ticket{Ticket: []byte("long-running query")},
+			Location: []*flight.Location{},
+		},
+	}
+	info := &flight.FlightInfo{
+		Schema:           flight.SerializeSchema(schema, memory.DefaultAllocator),
+		FlightDescriptor: desc,
+		Endpoint:         endpoints,
+		TotalRecords:     -1,
+		TotalBytes:       -1,
+	}
+	retryDesc := flight.FlightDescriptor{
+		Type: flight.DescriptorCMD,
+		Cmd:  []byte("retry"),
+	}
+	if desc.Type == retryDesc.Type && string(desc.Cmd) == string(retryDesc.Cmd) {
+		progress := float64(1.0)
+		return &flight.RetryInfo{
+			Info:             info,
+			FlightDescriptor: nil,
+			Progress:         &progress,
+			ExpirationTime:   nil,
+		}, nil
+	} else {
+		progress := float64(0.1)
+		return &flight.RetryInfo{
+			Info:             info,
+			FlightDescriptor: &retryDesc,
+			Progress:         &progress,
+			ExpirationTime:   timestamppb.New(time.Now().Add(time.Second * 10)),
+		}, nil
+	}
+}
+
+func (tester *pollFlightInfoScenarioTester) RunClient(addr string, opts ...grpc.DialOption) error {
+	client, err := flight.NewClientWithMiddleware(addr, nil, nil, opts...)
+	if err != nil {
+		return err
+	}
+	defer client.Close()
+
+	ctx := context.Background()
+	desc := flight.FlightDescriptor{
+		Type: flight.DescriptorCMD,
+		Cmd:  []byte("heavy query"),
+	}
+	info, err := client.PollFlightInfo(ctx, &desc)
+	if err != nil {
+		return err
+	}
+	if info.FlightDescriptor == nil {
+		return fmt.Errorf("description is missing: %s", info.String())
+	}
+	if info.Progress == nil {
+		return fmt.Errorf("progress is missing: %s", info.String())
+	}
+	if !(0.0 <= *info.Progress && *info.Progress <= 1.0) {
+		return fmt.Errorf("invalid progress: %s", info.String())
+	}
+	if info.ExpirationTime == nil {
+		return fmt.Errorf("expiration time is missing: %s", info.String())
+	}

Review Comment:
   The typical Go style would be to do this as a switch:
   
   ```go
   switch {
   case info.FlightDescriptor == nil:
           return fmt.Errorf.....
   case info.Progress == nil:
           return fmt.Errorf.....
   case !(0.0 <= *info.Progress && *info.Progress <= 1.0):
           return fmt.Errorf.....
   case info.ExpirationTime == nil:
           return fmt.Errorf.....
   }
   ```



##########
go/arrow/internal/flight_integration/scenario.go:
##########
@@ -1131,6 +1134,112 @@ func (tester *expirationTimeRenewFlightEndpointScenarioTester) RunClient(addr st
 	return nil
 }
 
+type pollFlightInfoScenarioTester struct {
+	flight.BaseFlightServer
+}
+
+func (tester *pollFlightInfoScenarioTester) MakeServer(port int) flight.Server {
+	srv := flight.NewServerWithMiddleware(nil)
+	srv.RegisterFlightService(tester)
+	initServer(port, srv)
+	return srv
+}
+
+func (tester *pollFlightInfoScenarioTester) PollFlightInfo(ctx context.Context, desc *flight.FlightDescriptor) (*flight.RetryInfo, error) {
+	schema := arrow.NewSchema(
+		[]arrow.Field{
+			{Name: "number", Type: arrow.PrimitiveTypes.Uint32},
+		},
+		nil,
+	)
+	endpoints := []*flight.FlightEndpoint{
+		&flight.FlightEndpoint{
+			Ticket:   &flight.Ticket{Ticket: []byte("long-running query")},
+			Location: []*flight.Location{},
+		},
+	}
+	info := &flight.FlightInfo{
+		Schema:           flight.SerializeSchema(schema, memory.DefaultAllocator),
+		FlightDescriptor: desc,
+		Endpoint:         endpoints,
+		TotalRecords:     -1,
+		TotalBytes:       -1,
+	}
+	retryDesc := flight.FlightDescriptor{
+		Type: flight.DescriptorCMD,
+		Cmd:  []byte("retry"),
+	}
+	if desc.Type == retryDesc.Type && string(desc.Cmd) == string(retryDesc.Cmd) {
+		progress := float64(1.0)
+		return &flight.RetryInfo{
+			Info:             info,
+			FlightDescriptor: nil,
+			Progress:         &progress,
+			ExpirationTime:   nil,
+		}, nil
+	} else {
+		progress := float64(0.1)
+		return &flight.RetryInfo{
+			Info:             info,
+			FlightDescriptor: &retryDesc,
+			Progress:         &progress,
+			ExpirationTime:   timestamppb.New(time.Now().Add(time.Second * 10)),
+		}, nil
+	}
+}
+
+func (tester *pollFlightInfoScenarioTester) RunClient(addr string, opts ...grpc.DialOption) error {
+	client, err := flight.NewClientWithMiddleware(addr, nil, nil, opts...)
+	if err != nil {
+		return err
+	}
+	defer client.Close()
+
+	ctx := context.Background()
+	desc := flight.FlightDescriptor{
+		Type: flight.DescriptorCMD,
+		Cmd:  []byte("heavy query"),
+	}
+	info, err := client.PollFlightInfo(ctx, &desc)
+	if err != nil {
+		return err
+	}
+	if info.FlightDescriptor == nil {
+		return fmt.Errorf("description is missing: %s", info.String())
+	}
+	if info.Progress == nil {
+		return fmt.Errorf("progress is missing: %s", info.String())
+	}
+	if !(0.0 <= *info.Progress && *info.Progress <= 1.0) {
+		return fmt.Errorf("invalid progress: %s", info.String())
+	}
+	if info.ExpirationTime == nil {
+		return fmt.Errorf("expiration time is missing: %s", info.String())
+	}
+
+	info, err = client.PollFlightInfo(ctx, info.FlightDescriptor)
+	if err != nil {
+		return err
+	}
+	if info.FlightDescriptor != nil {
+		return fmt.Errorf("retried but no finished yet: %s", info.String())
+	}
+	if info.Progress == nil {
+		return fmt.Errorf("progress is missing in finished query: %s",
+			info.String())
+	}
+	if math.Abs(*info.Progress-1.0) > 1e-5 {
+		return fmt.Errorf("progress for finished query isn't 1.0: %s",
+			info.String())
+	}
+	if info.ExpirationTime != nil {
+		return fmt.Errorf("expiration time must not be set for finished query: %s",
+			info.String())
+	}

Review Comment:
   same as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org