You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2020/09/25 01:34:11 UTC
[arrow] branch master updated: ARROW-8601: [Go][Flight]
Implementations Flight RPC server and client
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new c0dd2e2 ARROW-8601: [Go][Flight] Implementations Flight RPC server and client
c0dd2e2 is described below
commit c0dd2e2166f5f3a9c6b6a03c6983bd886de16c65
Author: Matthew Topol <mt...@factset.com>
AuthorDate: Thu Sep 24 20:33:00 2020 -0500
ARROW-8601: [Go][Flight] Implementations Flight RPC server and client
Built out from https://github.com/apache/arrow/pull/6731 with some inspiration from the existing Reader/Writer and the C++ Flight implementation. Still need to build out the tests some more, but would like to get opinions and thoughts on what I've got so far as I continue to build it out.
Closes #8175 from zeroshade/zeroshade/go/flight
Authored-by: Matthew Topol <mt...@factset.com>
Signed-off-by: Wes McKinney <we...@apache.org>
---
format/Flight.proto | 2 +
go/arrow/flight/Flight.pb.go | 1473 +++++++++++++++++++++++++
go/arrow/flight/Flight_grpc.pb.go | 877 +++++++++++++++
go/arrow/flight/client.go | 89 ++
go/arrow/flight/client_auth.go | 91 ++
go/arrow/flight/example_flight_server_test.go | 70 ++
go/arrow/flight/flight_test.go | 305 +++++
go/arrow/{go.mod => flight/gen.go} | 12 +-
go/arrow/flight/server.go | 118 ++
go/arrow/flight/server_auth.go | 145 +++
go/arrow/go.mod | 8 +
go/arrow/go.sum | 94 ++
go/arrow/ipc/flight_data_reader.go | 210 ++++
go/arrow/ipc/flight_data_writer.go | 150 +++
14 files changed, 3634 insertions(+), 10 deletions(-)
diff --git a/format/Flight.proto b/format/Flight.proto
index 71ae7ca..7b0f591 100644
--- a/format/Flight.proto
+++ b/format/Flight.proto
@@ -19,6 +19,8 @@
syntax = "proto3";
option java_package = "org.apache.arrow.flight.impl";
+option go_package = "github.com/apache/arrow/go/flight;flight";
+
package arrow.flight.protocol;
/*
diff --git a/go/arrow/flight/Flight.pb.go b/go/arrow/flight/Flight.pb.go
new file mode 100644
index 0000000..75c6c2c
--- /dev/null
+++ b/go/arrow/flight/Flight.pb.go
@@ -0,0 +1,1473 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+// <p>
+// http://www.apache.org/licenses/LICENSE-2.0
+// <p>
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.25.0
+// protoc v3.9.1
+// source: Flight.proto
+
+package flight
+
+import (
+ proto "github.com/golang/protobuf/proto"
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ reflect "reflect"
+ sync "sync"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+// This is a compile-time assertion that a sufficiently up-to-date version
+// of the legacy proto package is being used.
+const _ = proto.ProtoPackageIsVersion4
+
+//
+// Describes what type of descriptor is defined.
+type FlightDescriptor_DescriptorType int32
+
+const (
+ // Protobuf pattern, not used.
+ FlightDescriptor_UNKNOWN FlightDescriptor_DescriptorType = 0
+ //
+ // A named path that identifies a dataset. A path is composed of a string
+ // or list of strings describing a particular dataset. This is conceptually
+ // similar to a path inside a filesystem.
+ FlightDescriptor_PATH FlightDescriptor_DescriptorType = 1
+ //
+ // An opaque command to generate a dataset.
+ FlightDescriptor_CMD FlightDescriptor_DescriptorType = 2
+)
+
+// Enum value maps for FlightDescriptor_DescriptorType.
+var (
+ FlightDescriptor_DescriptorType_name = map[int32]string{
+ 0: "UNKNOWN",
+ 1: "PATH",
+ 2: "CMD",
+ }
+ FlightDescriptor_DescriptorType_value = map[string]int32{
+ "UNKNOWN": 0,
+ "PATH": 1,
+ "CMD": 2,
+ }
+)
+
+func (x FlightDescriptor_DescriptorType) Enum() *FlightDescriptor_DescriptorType {
+ p := new(FlightDescriptor_DescriptorType)
+ *p = x
+ return p
+}
+
+func (x FlightDescriptor_DescriptorType) String() string {
+ return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
+}
+
+func (FlightDescriptor_DescriptorType) Descriptor() protoreflect.EnumDescriptor {
+ return file_Flight_proto_enumTypes[0].Descriptor()
+}
+
+func (FlightDescriptor_DescriptorType) Type() protoreflect.EnumType {
+ return &file_Flight_proto_enumTypes[0]
+}
+
+func (x FlightDescriptor_DescriptorType) Number() protoreflect.EnumNumber {
+ return protoreflect.EnumNumber(x)
+}
+
+// Deprecated: Use FlightDescriptor_DescriptorType.Descriptor instead.
+func (FlightDescriptor_DescriptorType) EnumDescriptor() ([]byte, []int) {
+ return file_Flight_proto_rawDescGZIP(), []int{9, 0}
+}
+
+//
+// The request that a client provides to a server on handshake.
+type HandshakeRequest struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ //
+ // A defined protocol version
+ ProtocolVersion uint64 `protobuf:"varint,1,opt,name=protocol_version,json=protocolVersion,proto3" json:"protocol_version,omitempty"`
+ //
+ // Arbitrary auth/handshake info.
+ Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
+}
+
+func (x *HandshakeRequest) Reset() {
+ *x = HandshakeRequest{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_Flight_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *HandshakeRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*HandshakeRequest) ProtoMessage() {}
+
+func (x *HandshakeRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_Flight_proto_msgTypes[0]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use HandshakeRequest.ProtoReflect.Descriptor instead.
+func (*HandshakeRequest) Descriptor() ([]byte, []int) {
+ return file_Flight_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *HandshakeRequest) GetProtocolVersion() uint64 {
+ if x != nil {
+ return x.ProtocolVersion
+ }
+ return 0
+}
+
+func (x *HandshakeRequest) GetPayload() []byte {
+ if x != nil {
+ return x.Payload
+ }
+ return nil
+}
+
+type HandshakeResponse struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ //
+ // A defined protocol version
+ ProtocolVersion uint64 `protobuf:"varint,1,opt,name=protocol_version,json=protocolVersion,proto3" json:"protocol_version,omitempty"`
+ //
+ // Arbitrary auth/handshake info.
+ Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
+}
+
+func (x *HandshakeResponse) Reset() {
+ *x = HandshakeResponse{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_Flight_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *HandshakeResponse) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*HandshakeResponse) ProtoMessage() {}
+
+func (x *HandshakeResponse) ProtoReflect() protoreflect.Message {
+ mi := &file_Flight_proto_msgTypes[1]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use HandshakeResponse.ProtoReflect.Descriptor instead.
+func (*HandshakeResponse) Descriptor() ([]byte, []int) {
+ return file_Flight_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *HandshakeResponse) GetProtocolVersion() uint64 {
+ if x != nil {
+ return x.ProtocolVersion
+ }
+ return 0
+}
+
+func (x *HandshakeResponse) GetPayload() []byte {
+ if x != nil {
+ return x.Payload
+ }
+ return nil
+}
+
+//
+// A message for doing simple auth.
+type BasicAuth struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Username string `protobuf:"bytes,2,opt,name=username,proto3" json:"username,omitempty"`
+ Password string `protobuf:"bytes,3,opt,name=password,proto3" json:"password,omitempty"`
+}
+
+func (x *BasicAuth) Reset() {
+ *x = BasicAuth{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_Flight_proto_msgTypes[2]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *BasicAuth) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*BasicAuth) ProtoMessage() {}
+
+func (x *BasicAuth) ProtoReflect() protoreflect.Message {
+ mi := &file_Flight_proto_msgTypes[2]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use BasicAuth.ProtoReflect.Descriptor instead.
+func (*BasicAuth) Descriptor() ([]byte, []int) {
+ return file_Flight_proto_rawDescGZIP(), []int{2}
+}
+
+func (x *BasicAuth) GetUsername() string {
+ if x != nil {
+ return x.Username
+ }
+ return ""
+}
+
+func (x *BasicAuth) GetPassword() string {
+ if x != nil {
+ return x.Password
+ }
+ return ""
+}
+
+type Empty struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+}
+
+func (x *Empty) Reset() {
+ *x = Empty{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_Flight_proto_msgTypes[3]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Empty) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Empty) ProtoMessage() {}
+
+func (x *Empty) ProtoReflect() protoreflect.Message {
+ mi := &file_Flight_proto_msgTypes[3]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Empty.ProtoReflect.Descriptor instead.
+func (*Empty) Descriptor() ([]byte, []int) {
+ return file_Flight_proto_rawDescGZIP(), []int{3}
+}
+
+//
+// Describes an available action, including both the name used for execution
+// along with a short description of the purpose of the action.
+type ActionType struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
+ Description string `protobuf:"bytes,2,opt,name=description,proto3" json:"description,omitempty"`
+}
+
+func (x *ActionType) Reset() {
+ *x = ActionType{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_Flight_proto_msgTypes[4]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *ActionType) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*ActionType) ProtoMessage() {}
+
+func (x *ActionType) ProtoReflect() protoreflect.Message {
+ mi := &file_Flight_proto_msgTypes[4]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use ActionType.ProtoReflect.Descriptor instead.
+func (*ActionType) Descriptor() ([]byte, []int) {
+ return file_Flight_proto_rawDescGZIP(), []int{4}
+}
+
+func (x *ActionType) GetType() string {
+ if x != nil {
+ return x.Type
+ }
+ return ""
+}
+
+func (x *ActionType) GetDescription() string {
+ if x != nil {
+ return x.Description
+ }
+ return ""
+}
+
+//
+// A service specific expression that can be used to return a limited set
+// of available Arrow Flight streams.
+type Criteria struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Expression []byte `protobuf:"bytes,1,opt,name=expression,proto3" json:"expression,omitempty"`
+}
+
+func (x *Criteria) Reset() {
+ *x = Criteria{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_Flight_proto_msgTypes[5]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Criteria) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Criteria) ProtoMessage() {}
+
+func (x *Criteria) ProtoReflect() protoreflect.Message {
+ mi := &file_Flight_proto_msgTypes[5]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Criteria.ProtoReflect.Descriptor instead.
+func (*Criteria) Descriptor() ([]byte, []int) {
+ return file_Flight_proto_rawDescGZIP(), []int{5}
+}
+
+func (x *Criteria) GetExpression() []byte {
+ if x != nil {
+ return x.Expression
+ }
+ return nil
+}
+
+//
+// An opaque action specific for the service.
+type Action struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
+ Body []byte `protobuf:"bytes,2,opt,name=body,proto3" json:"body,omitempty"`
+}
+
+func (x *Action) Reset() {
+ *x = Action{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_Flight_proto_msgTypes[6]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Action) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Action) ProtoMessage() {}
+
+func (x *Action) ProtoReflect() protoreflect.Message {
+ mi := &file_Flight_proto_msgTypes[6]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Action.ProtoReflect.Descriptor instead.
+func (*Action) Descriptor() ([]byte, []int) {
+ return file_Flight_proto_rawDescGZIP(), []int{6}
+}
+
+func (x *Action) GetType() string {
+ if x != nil {
+ return x.Type
+ }
+ return ""
+}
+
+func (x *Action) GetBody() []byte {
+ if x != nil {
+ return x.Body
+ }
+ return nil
+}
+
+//
+// An opaque result returned after executing an action.
+type Result struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Body []byte `protobuf:"bytes,1,opt,name=body,proto3" json:"body,omitempty"`
+}
+
+func (x *Result) Reset() {
+ *x = Result{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_Flight_proto_msgTypes[7]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Result) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Result) ProtoMessage() {}
+
+func (x *Result) ProtoReflect() protoreflect.Message {
+ mi := &file_Flight_proto_msgTypes[7]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Result.ProtoReflect.Descriptor instead.
+func (*Result) Descriptor() ([]byte, []int) {
+ return file_Flight_proto_rawDescGZIP(), []int{7}
+}
+
+func (x *Result) GetBody() []byte {
+ if x != nil {
+ return x.Body
+ }
+ return nil
+}
+
+//
+// Wrap the result of a getSchema call
+type SchemaResult struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // schema of the dataset as described in Schema.fbs::Schema.
+ Schema []byte `protobuf:"bytes,1,opt,name=schema,proto3" json:"schema,omitempty"`
+}
+
+func (x *SchemaResult) Reset() {
+ *x = SchemaResult{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_Flight_proto_msgTypes[8]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *SchemaResult) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*SchemaResult) ProtoMessage() {}
+
+func (x *SchemaResult) ProtoReflect() protoreflect.Message {
+ mi := &file_Flight_proto_msgTypes[8]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use SchemaResult.ProtoReflect.Descriptor instead.
+func (*SchemaResult) Descriptor() ([]byte, []int) {
+ return file_Flight_proto_rawDescGZIP(), []int{8}
+}
+
+func (x *SchemaResult) GetSchema() []byte {
+ if x != nil {
+ return x.Schema
+ }
+ return nil
+}
+
+//
+// The name or tag for a Flight. May be used as a way to retrieve or generate
+// a flight or be used to expose a set of previously defined flights.
+type FlightDescriptor struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Type FlightDescriptor_DescriptorType `protobuf:"varint,1,opt,name=type,proto3,enum=arrow.flight.protocol.FlightDescriptor_DescriptorType" json:"type,omitempty"`
+ //
+ // Opaque value used to express a command. Should only be defined when
+ // type = CMD.
+ Cmd []byte `protobuf:"bytes,2,opt,name=cmd,proto3" json:"cmd,omitempty"`
+ //
+ // List of strings identifying a particular dataset. Should only be defined
+ // when type = PATH.
+ Path []string `protobuf:"bytes,3,rep,name=path,proto3" json:"path,omitempty"`
+}
+
+func (x *FlightDescriptor) Reset() {
+ *x = FlightDescriptor{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_Flight_proto_msgTypes[9]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *FlightDescriptor) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FlightDescriptor) ProtoMessage() {}
+
+func (x *FlightDescriptor) ProtoReflect() protoreflect.Message {
+ mi := &file_Flight_proto_msgTypes[9]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use FlightDescriptor.ProtoReflect.Descriptor instead.
+func (*FlightDescriptor) Descriptor() ([]byte, []int) {
+ return file_Flight_proto_rawDescGZIP(), []int{9}
+}
+
+func (x *FlightDescriptor) GetType() FlightDescriptor_DescriptorType {
+ if x != nil {
+ return x.Type
+ }
+ return FlightDescriptor_UNKNOWN
+}
+
+func (x *FlightDescriptor) GetCmd() []byte {
+ if x != nil {
+ return x.Cmd
+ }
+ return nil
+}
+
+func (x *FlightDescriptor) GetPath() []string {
+ if x != nil {
+ return x.Path
+ }
+ return nil
+}
+
+//
+// The access coordinates for retrieval of a dataset. With a FlightInfo, a
+// consumer is able to determine how to retrieve a dataset.
+type FlightInfo struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // schema of the dataset as described in Schema.fbs::Schema.
+ Schema []byte `protobuf:"bytes,1,opt,name=schema,proto3" json:"schema,omitempty"`
+ //
+ // The descriptor associated with this info.
+ FlightDescriptor *FlightDescriptor `protobuf:"bytes,2,opt,name=flight_descriptor,json=flightDescriptor,proto3" json:"flight_descriptor,omitempty"`
+ //
+ // A list of endpoints associated with the flight. To consume the whole
+ // flight, all endpoints must be consumed.
+ Endpoint []*FlightEndpoint `protobuf:"bytes,3,rep,name=endpoint,proto3" json:"endpoint,omitempty"`
+ // Set these to -1 if unknown.
+ TotalRecords int64 `protobuf:"varint,4,opt,name=total_records,json=totalRecords,proto3" json:"total_records,omitempty"`
+ TotalBytes int64 `protobuf:"varint,5,opt,name=total_bytes,json=totalBytes,proto3" json:"total_bytes,omitempty"`
+}
+
+func (x *FlightInfo) Reset() {
+ *x = FlightInfo{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_Flight_proto_msgTypes[10]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *FlightInfo) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FlightInfo) ProtoMessage() {}
+
+func (x *FlightInfo) ProtoReflect() protoreflect.Message {
+ mi := &file_Flight_proto_msgTypes[10]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use FlightInfo.ProtoReflect.Descriptor instead.
+func (*FlightInfo) Descriptor() ([]byte, []int) {
+ return file_Flight_proto_rawDescGZIP(), []int{10}
+}
+
+func (x *FlightInfo) GetSchema() []byte {
+ if x != nil {
+ return x.Schema
+ }
+ return nil
+}
+
+func (x *FlightInfo) GetFlightDescriptor() *FlightDescriptor {
+ if x != nil {
+ return x.FlightDescriptor
+ }
+ return nil
+}
+
+func (x *FlightInfo) GetEndpoint() []*FlightEndpoint {
+ if x != nil {
+ return x.Endpoint
+ }
+ return nil
+}
+
+func (x *FlightInfo) GetTotalRecords() int64 {
+ if x != nil {
+ return x.TotalRecords
+ }
+ return 0
+}
+
+func (x *FlightInfo) GetTotalBytes() int64 {
+ if x != nil {
+ return x.TotalBytes
+ }
+ return 0
+}
+
+//
+// A particular stream or split associated with a flight.
+type FlightEndpoint struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ //
+ // Token used to retrieve this stream.
+ Ticket *Ticket `protobuf:"bytes,1,opt,name=ticket,proto3" json:"ticket,omitempty"`
+ //
+ // A list of URIs where this ticket can be redeemed. If the list is
+ // empty, the expectation is that the ticket can only be redeemed on the
+ // current service where the ticket was generated.
+ Location []*Location `protobuf:"bytes,2,rep,name=location,proto3" json:"location,omitempty"`
+}
+
+func (x *FlightEndpoint) Reset() {
+ *x = FlightEndpoint{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_Flight_proto_msgTypes[11]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *FlightEndpoint) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FlightEndpoint) ProtoMessage() {}
+
+func (x *FlightEndpoint) ProtoReflect() protoreflect.Message {
+ mi := &file_Flight_proto_msgTypes[11]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use FlightEndpoint.ProtoReflect.Descriptor instead.
+func (*FlightEndpoint) Descriptor() ([]byte, []int) {
+ return file_Flight_proto_rawDescGZIP(), []int{11}
+}
+
+func (x *FlightEndpoint) GetTicket() *Ticket {
+ if x != nil {
+ return x.Ticket
+ }
+ return nil
+}
+
+func (x *FlightEndpoint) GetLocation() []*Location {
+ if x != nil {
+ return x.Location
+ }
+ return nil
+}
+
+//
+// A location where a Flight service will accept retrieval of a particular
+// stream given a ticket.
+type Location struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Uri string `protobuf:"bytes,1,opt,name=uri,proto3" json:"uri,omitempty"`
+}
+
+func (x *Location) Reset() {
+ *x = Location{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_Flight_proto_msgTypes[12]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Location) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Location) ProtoMessage() {}
+
+func (x *Location) ProtoReflect() protoreflect.Message {
+ mi := &file_Flight_proto_msgTypes[12]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Location.ProtoReflect.Descriptor instead.
+func (*Location) Descriptor() ([]byte, []int) {
+ return file_Flight_proto_rawDescGZIP(), []int{12}
+}
+
+func (x *Location) GetUri() string {
+ if x != nil {
+ return x.Uri
+ }
+ return ""
+}
+
+//
+// An opaque identifier that the service can use to retrieve a particular
+// portion of a stream.
+type Ticket struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Ticket []byte `protobuf:"bytes,1,opt,name=ticket,proto3" json:"ticket,omitempty"`
+}
+
+func (x *Ticket) Reset() {
+ *x = Ticket{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_Flight_proto_msgTypes[13]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Ticket) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Ticket) ProtoMessage() {}
+
+func (x *Ticket) ProtoReflect() protoreflect.Message {
+ mi := &file_Flight_proto_msgTypes[13]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Ticket.ProtoReflect.Descriptor instead.
+func (*Ticket) Descriptor() ([]byte, []int) {
+ return file_Flight_proto_rawDescGZIP(), []int{13}
+}
+
+func (x *Ticket) GetTicket() []byte {
+ if x != nil {
+ return x.Ticket
+ }
+ return nil
+}
+
+//
+// A batch of Arrow data as part of a stream of batches.
+type FlightData struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ //
+ // The descriptor of the data. This is only relevant when a client is
+ // starting a new DoPut stream.
+ FlightDescriptor *FlightDescriptor `protobuf:"bytes,1,opt,name=flight_descriptor,json=flightDescriptor,proto3" json:"flight_descriptor,omitempty"`
+ //
+ // Header for message data as described in Message.fbs::Message.
+ DataHeader []byte `protobuf:"bytes,2,opt,name=data_header,json=dataHeader,proto3" json:"data_header,omitempty"`
+ //
+ // Application-defined metadata.
+ AppMetadata []byte `protobuf:"bytes,3,opt,name=app_metadata,json=appMetadata,proto3" json:"app_metadata,omitempty"`
+ //
+ // The actual batch of Arrow data. Preferably handled with minimal-copies
+ // coming last in the definition to help with sidecar patterns (it is
+ // expected that some implementations will fetch this field off the wire
+ // with specialized code to avoid extra memory copies).
+ DataBody []byte `protobuf:"bytes,1000,opt,name=data_body,json=dataBody,proto3" json:"data_body,omitempty"`
+}
+
+func (x *FlightData) Reset() {
+ *x = FlightData{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_Flight_proto_msgTypes[14]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *FlightData) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FlightData) ProtoMessage() {}
+
+func (x *FlightData) ProtoReflect() protoreflect.Message {
+ mi := &file_Flight_proto_msgTypes[14]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use FlightData.ProtoReflect.Descriptor instead.
+func (*FlightData) Descriptor() ([]byte, []int) {
+ return file_Flight_proto_rawDescGZIP(), []int{14}
+}
+
+func (x *FlightData) GetFlightDescriptor() *FlightDescriptor {
+ if x != nil {
+ return x.FlightDescriptor
+ }
+ return nil
+}
+
+func (x *FlightData) GetDataHeader() []byte {
+ if x != nil {
+ return x.DataHeader
+ }
+ return nil
+}
+
+func (x *FlightData) GetAppMetadata() []byte {
+ if x != nil {
+ return x.AppMetadata
+ }
+ return nil
+}
+
+func (x *FlightData) GetDataBody() []byte {
+ if x != nil {
+ return x.DataBody
+ }
+ return nil
+}
+
+//*
+// The response message associated with the submission of a DoPut.
+type PutResult struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ AppMetadata []byte `protobuf:"bytes,1,opt,name=app_metadata,json=appMetadata,proto3" json:"app_metadata,omitempty"`
+}
+
+func (x *PutResult) Reset() {
+ *x = PutResult{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_Flight_proto_msgTypes[15]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *PutResult) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*PutResult) ProtoMessage() {}
+
+func (x *PutResult) ProtoReflect() protoreflect.Message {
+ mi := &file_Flight_proto_msgTypes[15]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use PutResult.ProtoReflect.Descriptor instead.
+func (*PutResult) Descriptor() ([]byte, []int) {
+ return file_Flight_proto_rawDescGZIP(), []int{15}
+}
+
+func (x *PutResult) GetAppMetadata() []byte {
+ if x != nil {
+ return x.AppMetadata
+ }
+ return nil
+}
+
+var File_Flight_proto protoreflect.FileDescriptor
+
+var file_Flight_proto_rawDesc = []byte{
+ 0x0a, 0x0c, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x15,
+ 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f,
+ 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x22, 0x57, 0x0a, 0x10, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61,
+ 0x6b, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x29, 0x0a, 0x10, 0x70, 0x72, 0x6f,
+ 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20,
+ 0x01, 0x28, 0x04, 0x52, 0x0f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72,
+ 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18,
+ 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x58,
+ 0x0a, 0x11, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f,
+ 0x6e, 0x73, 0x65, 0x12, 0x29, 0x0a, 0x10, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x5f,
+ 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0f, 0x70,
+ 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x18,
+ 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52,
+ 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x43, 0x0a, 0x09, 0x42, 0x61, 0x73, 0x69,
+ 0x63, 0x41, 0x75, 0x74, 0x68, 0x12, 0x1a, 0x0a, 0x08, 0x75, 0x73, 0x65, 0x72, 0x6e, 0x61, 0x6d,
+ 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x75, 0x73, 0x65, 0x72, 0x6e, 0x61, 0x6d,
+ 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x18, 0x03, 0x20,
+ 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x22, 0x07, 0x0a,
+ 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x42, 0x0a, 0x0a, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e,
+ 0x54, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01,
+ 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x64, 0x65, 0x73, 0x63,
+ 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x64,
+ 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x2a, 0x0a, 0x08, 0x43, 0x72,
+ 0x69, 0x74, 0x65, 0x72, 0x69, 0x61, 0x12, 0x1e, 0x0a, 0x0a, 0x65, 0x78, 0x70, 0x72, 0x65, 0x73,
+ 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x65, 0x78, 0x70, 0x72,
+ 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x30, 0x0a, 0x06, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e,
+ 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04,
+ 0x74, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x18, 0x02, 0x20, 0x01,
+ 0x28, 0x0c, 0x52, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0x1c, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75,
+ 0x6c, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c,
+ 0x52, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0x26, 0x0a, 0x0c, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61,
+ 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61,
+ 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x22, 0xb6,
+ 0x01, 0x0a, 0x10, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70,
+ 0x74, 0x6f, 0x72, 0x12, 0x4a, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
+ 0x0e, 0x32, 0x36, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74,
+ 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74,
+ 0x44, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x6f, 0x72, 0x2e, 0x44, 0x65, 0x73, 0x63, 0x72,
+ 0x69, 0x70, 0x74, 0x6f, 0x72, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12,
+ 0x10, 0x0a, 0x03, 0x63, 0x6d, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x63, 0x6d,
+ 0x64, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52,
+ 0x04, 0x70, 0x61, 0x74, 0x68, 0x22, 0x30, 0x0a, 0x0e, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70,
+ 0x74, 0x6f, 0x72, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f,
+ 0x57, 0x4e, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x50, 0x41, 0x54, 0x48, 0x10, 0x01, 0x12, 0x07,
+ 0x0a, 0x03, 0x43, 0x4d, 0x44, 0x10, 0x02, 0x22, 0x83, 0x02, 0x0a, 0x0a, 0x46, 0x6c, 0x69, 0x67,
+ 0x68, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61,
+ 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x12, 0x54,
+ 0x0a, 0x11, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x5f, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70,
+ 0x74, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x61, 0x72, 0x72, 0x6f,
+ 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f,
+ 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74,
+ 0x6f, 0x72, 0x52, 0x10, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69,
+ 0x70, 0x74, 0x6f, 0x72, 0x12, 0x41, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74,
+ 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66,
+ 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46,
+ 0x6c, 0x69, 0x67, 0x68, 0x74, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x52, 0x08, 0x65,
+ 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x23, 0x0a, 0x0d, 0x74, 0x6f, 0x74, 0x61, 0x6c,
+ 0x5f, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c,
+ 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x12, 0x1f, 0x0a, 0x0b,
+ 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28,
+ 0x03, 0x52, 0x0a, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x42, 0x79, 0x74, 0x65, 0x73, 0x22, 0x84, 0x01,
+ 0x0a, 0x0e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74,
+ 0x12, 0x35, 0x0a, 0x06, 0x74, 0x69, 0x63, 0x6b, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b,
+ 0x32, 0x1d, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e,
+ 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x54, 0x69, 0x63, 0x6b, 0x65, 0x74, 0x52,
+ 0x06, 0x74, 0x69, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x3b, 0x0a, 0x08, 0x6c, 0x6f, 0x63, 0x61, 0x74,
+ 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x61, 0x72, 0x72, 0x6f,
+ 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f,
+ 0x6c, 0x2e, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x6c, 0x6f, 0x63, 0x61,
+ 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x1c, 0x0a, 0x08, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e,
+ 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x69, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75,
+ 0x72, 0x69, 0x22, 0x20, 0x0a, 0x06, 0x54, 0x69, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x16, 0x0a, 0x06,
+ 0x74, 0x69, 0x63, 0x6b, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x74, 0x69,
+ 0x63, 0x6b, 0x65, 0x74, 0x22, 0xc4, 0x01, 0x0a, 0x0a, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44,
+ 0x61, 0x74, 0x61, 0x12, 0x54, 0x0a, 0x11, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x5f, 0x64, 0x65,
+ 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27,
+ 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72,
+ 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x65, 0x73,
+ 0x63, 0x72, 0x69, 0x70, 0x74, 0x6f, 0x72, 0x52, 0x10, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44,
+ 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x6f, 0x72, 0x12, 0x1f, 0x0a, 0x0b, 0x64, 0x61, 0x74,
+ 0x61, 0x5f, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a,
+ 0x64, 0x61, 0x74, 0x61, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x70,
+ 0x70, 0x5f, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c,
+ 0x52, 0x0b, 0x61, 0x70, 0x70, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x1c, 0x0a,
+ 0x09, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x62, 0x6f, 0x64, 0x79, 0x18, 0xe8, 0x07, 0x20, 0x01, 0x28,
+ 0x0c, 0x52, 0x08, 0x64, 0x61, 0x74, 0x61, 0x42, 0x6f, 0x64, 0x79, 0x22, 0x2e, 0x0a, 0x09, 0x50,
+ 0x75, 0x74, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x70, 0x70, 0x5f,
+ 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b,
+ 0x61, 0x70, 0x70, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x32, 0xa7, 0x06, 0x0a, 0x0d,
+ 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x64, 0x0a,
+ 0x09, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x27, 0x2e, 0x61, 0x72, 0x72,
+ 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63,
+ 0x6f, 0x6c, 0x2e, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x52, 0x65, 0x71, 0x75,
+ 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67,
+ 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x48, 0x61, 0x6e, 0x64,
+ 0x73, 0x68, 0x61, 0x6b, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28,
+ 0x01, 0x30, 0x01, 0x12, 0x55, 0x0a, 0x0b, 0x4c, 0x69, 0x73, 0x74, 0x46, 0x6c, 0x69, 0x67, 0x68,
+ 0x74, 0x73, 0x12, 0x1f, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68,
+ 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x43, 0x72, 0x69, 0x74, 0x65,
+ 0x72, 0x69, 0x61, 0x1a, 0x21, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67,
+ 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67,
+ 0x68, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x00, 0x30, 0x01, 0x12, 0x5d, 0x0a, 0x0d, 0x47, 0x65,
+ 0x74, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x27, 0x2e, 0x61, 0x72,
+ 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
+ 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69,
+ 0x70, 0x74, 0x6f, 0x72, 0x1a, 0x21, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69,
+ 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69,
+ 0x67, 0x68, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x00, 0x12, 0x5b, 0x0a, 0x09, 0x47, 0x65, 0x74,
+ 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x12, 0x27, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66,
+ 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46,
+ 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x6f, 0x72, 0x1a,
+ 0x23, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70,
+ 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65,
+ 0x73, 0x75, 0x6c, 0x74, 0x22, 0x00, 0x12, 0x4d, 0x0a, 0x05, 0x44, 0x6f, 0x47, 0x65, 0x74, 0x12,
+ 0x1d, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70,
+ 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x54, 0x69, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x21,
+ 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72,
+ 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x61, 0x74,
+ 0x61, 0x22, 0x00, 0x30, 0x01, 0x12, 0x52, 0x0a, 0x05, 0x44, 0x6f, 0x50, 0x75, 0x74, 0x12, 0x21,
+ 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72,
+ 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x61, 0x74,
+ 0x61, 0x1a, 0x20, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74,
+ 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x50, 0x75, 0x74, 0x52, 0x65, 0x73,
+ 0x75, 0x6c, 0x74, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x58, 0x0a, 0x0a, 0x44, 0x6f, 0x45,
+ 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x21, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e,
+ 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e,
+ 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x21, 0x2e, 0x61, 0x72, 0x72,
+ 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63,
+ 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x61, 0x74, 0x61, 0x22, 0x00, 0x28,
+ 0x01, 0x30, 0x01, 0x12, 0x4c, 0x0a, 0x08, 0x44, 0x6f, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12,
+ 0x1d, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70,
+ 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x1d,
+ 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72,
+ 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x00, 0x30,
+ 0x01, 0x12, 0x52, 0x0a, 0x0b, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73,
+ 0x12, 0x1c, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e,
+ 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x21,
+ 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72,
+ 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70,
+ 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x48, 0x0a, 0x1c, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61,
+ 0x63, 0x68, 0x65, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74,
+ 0x2e, 0x69, 0x6d, 0x70, 0x6c, 0x5a, 0x28, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f,
+ 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2f, 0x67,
+ 0x6f, 0x2f, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x3b, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x62,
+ 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+ file_Flight_proto_rawDescOnce sync.Once
+ file_Flight_proto_rawDescData = file_Flight_proto_rawDesc
+)
+
+func file_Flight_proto_rawDescGZIP() []byte {
+ file_Flight_proto_rawDescOnce.Do(func() {
+ file_Flight_proto_rawDescData = protoimpl.X.CompressGZIP(file_Flight_proto_rawDescData)
+ })
+ return file_Flight_proto_rawDescData
+}
+
+var file_Flight_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
+var file_Flight_proto_msgTypes = make([]protoimpl.MessageInfo, 16)
+var file_Flight_proto_goTypes = []interface{}{
+ (FlightDescriptor_DescriptorType)(0), // 0: arrow.flight.protocol.FlightDescriptor.DescriptorType
+ (*HandshakeRequest)(nil), // 1: arrow.flight.protocol.HandshakeRequest
+ (*HandshakeResponse)(nil), // 2: arrow.flight.protocol.HandshakeResponse
+ (*BasicAuth)(nil), // 3: arrow.flight.protocol.BasicAuth
+ (*Empty)(nil), // 4: arrow.flight.protocol.Empty
+ (*ActionType)(nil), // 5: arrow.flight.protocol.ActionType
+ (*Criteria)(nil), // 6: arrow.flight.protocol.Criteria
+ (*Action)(nil), // 7: arrow.flight.protocol.Action
+ (*Result)(nil), // 8: arrow.flight.protocol.Result
+ (*SchemaResult)(nil), // 9: arrow.flight.protocol.SchemaResult
+ (*FlightDescriptor)(nil), // 10: arrow.flight.protocol.FlightDescriptor
+ (*FlightInfo)(nil), // 11: arrow.flight.protocol.FlightInfo
+ (*FlightEndpoint)(nil), // 12: arrow.flight.protocol.FlightEndpoint
+ (*Location)(nil), // 13: arrow.flight.protocol.Location
+ (*Ticket)(nil), // 14: arrow.flight.protocol.Ticket
+ (*FlightData)(nil), // 15: arrow.flight.protocol.FlightData
+ (*PutResult)(nil), // 16: arrow.flight.protocol.PutResult
+}
+var file_Flight_proto_depIdxs = []int32{
+ 0, // 0: arrow.flight.protocol.FlightDescriptor.type:type_name -> arrow.flight.protocol.FlightDescriptor.DescriptorType
+ 10, // 1: arrow.flight.protocol.FlightInfo.flight_descriptor:type_name -> arrow.flight.protocol.FlightDescriptor
+ 12, // 2: arrow.flight.protocol.FlightInfo.endpoint:type_name -> arrow.flight.protocol.FlightEndpoint
+ 14, // 3: arrow.flight.protocol.FlightEndpoint.ticket:type_name -> arrow.flight.protocol.Ticket
+ 13, // 4: arrow.flight.protocol.FlightEndpoint.location:type_name -> arrow.flight.protocol.Location
+ 10, // 5: arrow.flight.protocol.FlightData.flight_descriptor:type_name -> arrow.flight.protocol.FlightDescriptor
+ 1, // 6: arrow.flight.protocol.FlightService.Handshake:input_type -> arrow.flight.protocol.HandshakeRequest
+ 6, // 7: arrow.flight.protocol.FlightService.ListFlights:input_type -> arrow.flight.protocol.Criteria
+ 10, // 8: arrow.flight.protocol.FlightService.GetFlightInfo:input_type -> arrow.flight.protocol.FlightDescriptor
+ 10, // 9: arrow.flight.protocol.FlightService.GetSchema:input_type -> arrow.flight.protocol.FlightDescriptor
+ 14, // 10: arrow.flight.protocol.FlightService.DoGet:input_type -> arrow.flight.protocol.Ticket
+ 15, // 11: arrow.flight.protocol.FlightService.DoPut:input_type -> arrow.flight.protocol.FlightData
+ 15, // 12: arrow.flight.protocol.FlightService.DoExchange:input_type -> arrow.flight.protocol.FlightData
+ 7, // 13: arrow.flight.protocol.FlightService.DoAction:input_type -> arrow.flight.protocol.Action
+ 4, // 14: arrow.flight.protocol.FlightService.ListActions:input_type -> arrow.flight.protocol.Empty
+ 2, // 15: arrow.flight.protocol.FlightService.Handshake:output_type -> arrow.flight.protocol.HandshakeResponse
+ 11, // 16: arrow.flight.protocol.FlightService.ListFlights:output_type -> arrow.flight.protocol.FlightInfo
+ 11, // 17: arrow.flight.protocol.FlightService.GetFlightInfo:output_type -> arrow.flight.protocol.FlightInfo
+ 9, // 18: arrow.flight.protocol.FlightService.GetSchema:output_type -> arrow.flight.protocol.SchemaResult
+ 15, // 19: arrow.flight.protocol.FlightService.DoGet:output_type -> arrow.flight.protocol.FlightData
+ 16, // 20: arrow.flight.protocol.FlightService.DoPut:output_type -> arrow.flight.protocol.PutResult
+ 15, // 21: arrow.flight.protocol.FlightService.DoExchange:output_type -> arrow.flight.protocol.FlightData
+ 8, // 22: arrow.flight.protocol.FlightService.DoAction:output_type -> arrow.flight.protocol.Result
+ 5, // 23: arrow.flight.protocol.FlightService.ListActions:output_type -> arrow.flight.protocol.ActionType
+ 15, // [15:24] is the sub-list for method output_type
+ 6, // [6:15] is the sub-list for method input_type
+ 6, // [6:6] is the sub-list for extension type_name
+ 6, // [6:6] is the sub-list for extension extendee
+ 0, // [0:6] is the sub-list for field type_name
+}
+
+func init() { file_Flight_proto_init() }
+func file_Flight_proto_init() {
+ if File_Flight_proto != nil {
+ return
+ }
+ if !protoimpl.UnsafeEnabled {
+ file_Flight_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*HandshakeRequest); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_Flight_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*HandshakeResponse); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_Flight_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*BasicAuth); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_Flight_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Empty); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_Flight_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*ActionType); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_Flight_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Criteria); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_Flight_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Action); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_Flight_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Result); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_Flight_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*SchemaResult); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_Flight_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*FlightDescriptor); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_Flight_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*FlightInfo); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_Flight_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*FlightEndpoint); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_Flight_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Location); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_Flight_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Ticket); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_Flight_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*FlightData); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_Flight_proto_msgTypes[15].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*PutResult); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: file_Flight_proto_rawDesc,
+ NumEnums: 1,
+ NumMessages: 16,
+ NumExtensions: 0,
+ NumServices: 1,
+ },
+ GoTypes: file_Flight_proto_goTypes,
+ DependencyIndexes: file_Flight_proto_depIdxs,
+ EnumInfos: file_Flight_proto_enumTypes,
+ MessageInfos: file_Flight_proto_msgTypes,
+ }.Build()
+ File_Flight_proto = out.File
+ file_Flight_proto_rawDesc = nil
+ file_Flight_proto_goTypes = nil
+ file_Flight_proto_depIdxs = nil
+}
diff --git a/go/arrow/flight/Flight_grpc.pb.go b/go/arrow/flight/Flight_grpc.pb.go
new file mode 100644
index 0000000..c2b98d5
--- /dev/null
+++ b/go/arrow/flight/Flight_grpc.pb.go
@@ -0,0 +1,877 @@
+// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
+
+package flight
+
+import (
+ context "context"
+ grpc "google.golang.org/grpc"
+ codes "google.golang.org/grpc/codes"
+ status "google.golang.org/grpc/status"
+)
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc.SupportPackageIsVersion7
+
+// FlightServiceClient is the client API for FlightService service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
+type FlightServiceClient interface {
+ //
+ // Handshake between client and server. Depending on the server, the
+ // handshake may be required to determine the token that should be used for
+ // future operations. Both request and response are streams to allow multiple
+ // round-trips depending on auth mechanism.
+ Handshake(ctx context.Context, opts ...grpc.CallOption) (FlightService_HandshakeClient, error)
+ //
+ // Get a list of available streams given a particular criteria. Most flight
+ // services will expose one or more streams that are readily available for
+ // retrieval. This api allows listing the streams available for
+ // consumption. A user can also provide a criteria. The criteria can limit
+ // the subset of streams that can be listed via this interface. Each flight
+ // service allows its own definition of how to consume criteria.
+ ListFlights(ctx context.Context, in *Criteria, opts ...grpc.CallOption) (FlightService_ListFlightsClient, error)
+ //
+ // For a given FlightDescriptor, get information about how the flight can be
+ // consumed. This is a useful interface if the consumer of the interface
+ // already can identify the specific flight to consume. This interface can
+ // also allow a consumer to generate a flight stream through a specified
+ // descriptor. For example, a flight descriptor might be something that
+ // includes a SQL statement or a Pickled Python operation that will be
+ // executed. In those cases, the descriptor will not be previously available
+ // within the list of available streams provided by ListFlights but will be
+ // available for consumption for the duration defined by the specific flight
+ // service.
+ GetFlightInfo(ctx context.Context, in *FlightDescriptor, opts ...grpc.CallOption) (*FlightInfo, error)
+ //
+ // For a given FlightDescriptor, get the Schema as described in Schema.fbs::Schema
+ // This is used when a consumer needs the Schema of flight stream. Similar to
+ // GetFlightInfo this interface may generate a new flight that was not previously
+ // available in ListFlights.
+ GetSchema(ctx context.Context, in *FlightDescriptor, opts ...grpc.CallOption) (*SchemaResult, error)
+ //
+ // Retrieve a single stream associated with a particular descriptor
+ // associated with the referenced ticket. A Flight can be composed of one or
+ // more streams where each stream can be retrieved using a separate opaque
+ // ticket that the flight service uses for managing a collection of streams.
+ DoGet(ctx context.Context, in *Ticket, opts ...grpc.CallOption) (FlightService_DoGetClient, error)
+ //
+ // Push a stream to the flight service associated with a particular
+ // flight stream. This allows a client of a flight service to upload a stream
+ // of data. Depending on the particular flight service, a client consumer
+ // could be allowed to upload a single stream per descriptor or an unlimited
+ // number. In the latter, the service might implement a 'seal' action that
+ // can be applied to a descriptor once all streams are uploaded.
+ DoPut(ctx context.Context, opts ...grpc.CallOption) (FlightService_DoPutClient, error)
+ //
+ // Open a bidirectional data channel for a given descriptor. This
+ // allows clients to send and receive arbitrary Arrow data and
+ // application-specific metadata in a single logical stream. In
+ // contrast to DoGet/DoPut, this is more suited for clients
+ // offloading computation (rather than storage) to a Flight service.
+ DoExchange(ctx context.Context, opts ...grpc.CallOption) (FlightService_DoExchangeClient, error)
+ //
+ // Flight services can support an arbitrary number of simple actions in
+ // addition to the possible ListFlights, GetFlightInfo, DoGet, DoPut
+ // operations that are potentially available. DoAction allows a flight client
+ // to do a specific action against a flight service. An action includes
+ // opaque request and response objects that are specific to the type action
+ // being undertaken.
+ DoAction(ctx context.Context, in *Action, opts ...grpc.CallOption) (FlightService_DoActionClient, error)
+ //
+ // A flight service exposes all of the available action types that it has
+ // along with descriptions. This allows different flight consumers to
+ // understand the capabilities of the flight service.
+ ListActions(ctx context.Context, in *Empty, opts ...grpc.CallOption) (FlightService_ListActionsClient, error)
+}
+
+type flightServiceClient struct {
+ cc grpc.ClientConnInterface
+}
+
+func NewFlightServiceClient(cc grpc.ClientConnInterface) FlightServiceClient {
+ return &flightServiceClient{cc}
+}
+
+var flightServiceHandshakeStreamDesc = &grpc.StreamDesc{
+ StreamName: "Handshake",
+ ServerStreams: true,
+ ClientStreams: true,
+}
+
+func (c *flightServiceClient) Handshake(ctx context.Context, opts ...grpc.CallOption) (FlightService_HandshakeClient, error) {
+ stream, err := c.cc.NewStream(ctx, flightServiceHandshakeStreamDesc, "/arrow.flight.protocol.FlightService/Handshake", opts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &flightServiceHandshakeClient{stream}
+ return x, nil
+}
+
+type FlightService_HandshakeClient interface {
+ Send(*HandshakeRequest) error
+ Recv() (*HandshakeResponse, error)
+ grpc.ClientStream
+}
+
+type flightServiceHandshakeClient struct {
+ grpc.ClientStream
+}
+
+func (x *flightServiceHandshakeClient) Send(m *HandshakeRequest) error {
+ return x.ClientStream.SendMsg(m)
+}
+
+func (x *flightServiceHandshakeClient) Recv() (*HandshakeResponse, error) {
+ m := new(HandshakeResponse)
+ if err := x.ClientStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+var flightServiceListFlightsStreamDesc = &grpc.StreamDesc{
+ StreamName: "ListFlights",
+ ServerStreams: true,
+}
+
+func (c *flightServiceClient) ListFlights(ctx context.Context, in *Criteria, opts ...grpc.CallOption) (FlightService_ListFlightsClient, error) {
+ stream, err := c.cc.NewStream(ctx, flightServiceListFlightsStreamDesc, "/arrow.flight.protocol.FlightService/ListFlights", opts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &flightServiceListFlightsClient{stream}
+ if err := x.ClientStream.SendMsg(in); err != nil {
+ return nil, err
+ }
+ if err := x.ClientStream.CloseSend(); err != nil {
+ return nil, err
+ }
+ return x, nil
+}
+
+type FlightService_ListFlightsClient interface {
+ Recv() (*FlightInfo, error)
+ grpc.ClientStream
+}
+
+type flightServiceListFlightsClient struct {
+ grpc.ClientStream
+}
+
+func (x *flightServiceListFlightsClient) Recv() (*FlightInfo, error) {
+ m := new(FlightInfo)
+ if err := x.ClientStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+var flightServiceGetFlightInfoStreamDesc = &grpc.StreamDesc{
+ StreamName: "GetFlightInfo",
+}
+
+func (c *flightServiceClient) GetFlightInfo(ctx context.Context, in *FlightDescriptor, opts ...grpc.CallOption) (*FlightInfo, error) {
+ out := new(FlightInfo)
+ err := c.cc.Invoke(ctx, "/arrow.flight.protocol.FlightService/GetFlightInfo", in, out, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
+var flightServiceGetSchemaStreamDesc = &grpc.StreamDesc{
+ StreamName: "GetSchema",
+}
+
+func (c *flightServiceClient) GetSchema(ctx context.Context, in *FlightDescriptor, opts ...grpc.CallOption) (*SchemaResult, error) {
+ out := new(SchemaResult)
+ err := c.cc.Invoke(ctx, "/arrow.flight.protocol.FlightService/GetSchema", in, out, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
+var flightServiceDoGetStreamDesc = &grpc.StreamDesc{
+ StreamName: "DoGet",
+ ServerStreams: true,
+}
+
+func (c *flightServiceClient) DoGet(ctx context.Context, in *Ticket, opts ...grpc.CallOption) (FlightService_DoGetClient, error) {
+ stream, err := c.cc.NewStream(ctx, flightServiceDoGetStreamDesc, "/arrow.flight.protocol.FlightService/DoGet", opts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &flightServiceDoGetClient{stream}
+ if err := x.ClientStream.SendMsg(in); err != nil {
+ return nil, err
+ }
+ if err := x.ClientStream.CloseSend(); err != nil {
+ return nil, err
+ }
+ return x, nil
+}
+
+type FlightService_DoGetClient interface {
+ Recv() (*FlightData, error)
+ grpc.ClientStream
+}
+
+type flightServiceDoGetClient struct {
+ grpc.ClientStream
+}
+
+func (x *flightServiceDoGetClient) Recv() (*FlightData, error) {
+ m := new(FlightData)
+ if err := x.ClientStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+var flightServiceDoPutStreamDesc = &grpc.StreamDesc{
+ StreamName: "DoPut",
+ ServerStreams: true,
+ ClientStreams: true,
+}
+
+func (c *flightServiceClient) DoPut(ctx context.Context, opts ...grpc.CallOption) (FlightService_DoPutClient, error) {
+ stream, err := c.cc.NewStream(ctx, flightServiceDoPutStreamDesc, "/arrow.flight.protocol.FlightService/DoPut", opts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &flightServiceDoPutClient{stream}
+ return x, nil
+}
+
+type FlightService_DoPutClient interface {
+ Send(*FlightData) error
+ Recv() (*PutResult, error)
+ grpc.ClientStream
+}
+
+type flightServiceDoPutClient struct {
+ grpc.ClientStream
+}
+
+func (x *flightServiceDoPutClient) Send(m *FlightData) error {
+ return x.ClientStream.SendMsg(m)
+}
+
+func (x *flightServiceDoPutClient) Recv() (*PutResult, error) {
+ m := new(PutResult)
+ if err := x.ClientStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+var flightServiceDoExchangeStreamDesc = &grpc.StreamDesc{
+ StreamName: "DoExchange",
+ ServerStreams: true,
+ ClientStreams: true,
+}
+
+func (c *flightServiceClient) DoExchange(ctx context.Context, opts ...grpc.CallOption) (FlightService_DoExchangeClient, error) {
+ stream, err := c.cc.NewStream(ctx, flightServiceDoExchangeStreamDesc, "/arrow.flight.protocol.FlightService/DoExchange", opts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &flightServiceDoExchangeClient{stream}
+ return x, nil
+}
+
+type FlightService_DoExchangeClient interface {
+ Send(*FlightData) error
+ Recv() (*FlightData, error)
+ grpc.ClientStream
+}
+
+type flightServiceDoExchangeClient struct {
+ grpc.ClientStream
+}
+
+func (x *flightServiceDoExchangeClient) Send(m *FlightData) error {
+ return x.ClientStream.SendMsg(m)
+}
+
+func (x *flightServiceDoExchangeClient) Recv() (*FlightData, error) {
+ m := new(FlightData)
+ if err := x.ClientStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+var flightServiceDoActionStreamDesc = &grpc.StreamDesc{
+ StreamName: "DoAction",
+ ServerStreams: true,
+}
+
+func (c *flightServiceClient) DoAction(ctx context.Context, in *Action, opts ...grpc.CallOption) (FlightService_DoActionClient, error) {
+ stream, err := c.cc.NewStream(ctx, flightServiceDoActionStreamDesc, "/arrow.flight.protocol.FlightService/DoAction", opts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &flightServiceDoActionClient{stream}
+ if err := x.ClientStream.SendMsg(in); err != nil {
+ return nil, err
+ }
+ if err := x.ClientStream.CloseSend(); err != nil {
+ return nil, err
+ }
+ return x, nil
+}
+
+type FlightService_DoActionClient interface {
+ Recv() (*Result, error)
+ grpc.ClientStream
+}
+
+type flightServiceDoActionClient struct {
+ grpc.ClientStream
+}
+
+func (x *flightServiceDoActionClient) Recv() (*Result, error) {
+ m := new(Result)
+ if err := x.ClientStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+var flightServiceListActionsStreamDesc = &grpc.StreamDesc{
+ StreamName: "ListActions",
+ ServerStreams: true,
+}
+
+func (c *flightServiceClient) ListActions(ctx context.Context, in *Empty, opts ...grpc.CallOption) (FlightService_ListActionsClient, error) {
+ stream, err := c.cc.NewStream(ctx, flightServiceListActionsStreamDesc, "/arrow.flight.protocol.FlightService/ListActions", opts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &flightServiceListActionsClient{stream}
+ if err := x.ClientStream.SendMsg(in); err != nil {
+ return nil, err
+ }
+ if err := x.ClientStream.CloseSend(); err != nil {
+ return nil, err
+ }
+ return x, nil
+}
+
+type FlightService_ListActionsClient interface {
+ Recv() (*ActionType, error)
+ grpc.ClientStream
+}
+
+type flightServiceListActionsClient struct {
+ grpc.ClientStream
+}
+
+func (x *flightServiceListActionsClient) Recv() (*ActionType, error) {
+ m := new(ActionType)
+ if err := x.ClientStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+// FlightServiceService is the service API for FlightService service.
+// Fields should be assigned to their respective handler implementations only before
+// RegisterFlightServiceService is called. Any unassigned fields will result in the
+// handler for that method returning an Unimplemented error.
+type FlightServiceService struct {
+ //
+ // Handshake between client and server. Depending on the server, the
+ // handshake may be required to determine the token that should be used for
+ // future operations. Both request and response are streams to allow multiple
+ // round-trips depending on auth mechanism.
+ Handshake func(FlightService_HandshakeServer) error
+ //
+ // Get a list of available streams given a particular criteria. Most flight
+ // services will expose one or more streams that are readily available for
+ // retrieval. This api allows listing the streams available for
+ // consumption. A user can also provide a criteria. The criteria can limit
+ // the subset of streams that can be listed via this interface. Each flight
+ // service allows its own definition of how to consume criteria.
+ ListFlights func(*Criteria, FlightService_ListFlightsServer) error
+ //
+ // For a given FlightDescriptor, get information about how the flight can be
+ // consumed. This is a useful interface if the consumer of the interface
+ // already can identify the specific flight to consume. This interface can
+ // also allow a consumer to generate a flight stream through a specified
+ // descriptor. For example, a flight descriptor might be something that
+ // includes a SQL statement or a Pickled Python operation that will be
+ // executed. In those cases, the descriptor will not be previously available
+ // within the list of available streams provided by ListFlights but will be
+ // available for consumption for the duration defined by the specific flight
+ // service.
+ GetFlightInfo func(context.Context, *FlightDescriptor) (*FlightInfo, error)
+ //
+ // For a given FlightDescriptor, get the Schema as described in Schema.fbs::Schema
+ // This is used when a consumer needs the Schema of flight stream. Similar to
+ // GetFlightInfo this interface may generate a new flight that was not previously
+ // available in ListFlights.
+ GetSchema func(context.Context, *FlightDescriptor) (*SchemaResult, error)
+ //
+ // Retrieve a single stream associated with a particular descriptor
+ // associated with the referenced ticket. A Flight can be composed of one or
+ // more streams where each stream can be retrieved using a separate opaque
+ // ticket that the flight service uses for managing a collection of streams.
+ DoGet func(*Ticket, FlightService_DoGetServer) error
+ //
+ // Push a stream to the flight service associated with a particular
+ // flight stream. This allows a client of a flight service to upload a stream
+ // of data. Depending on the particular flight service, a client consumer
+ // could be allowed to upload a single stream per descriptor or an unlimited
+ // number. In the latter, the service might implement a 'seal' action that
+ // can be applied to a descriptor once all streams are uploaded.
+ DoPut func(FlightService_DoPutServer) error
+ //
+ // Open a bidirectional data channel for a given descriptor. This
+ // allows clients to send and receive arbitrary Arrow data and
+ // application-specific metadata in a single logical stream. In
+ // contrast to DoGet/DoPut, this is more suited for clients
+ // offloading computation (rather than storage) to a Flight service.
+ DoExchange func(FlightService_DoExchangeServer) error
+ //
+ // Flight services can support an arbitrary number of simple actions in
+ // addition to the possible ListFlights, GetFlightInfo, DoGet, DoPut
+ // operations that are potentially available. DoAction allows a flight client
+ // to do a specific action against a flight service. An action includes
+ // opaque request and response objects that are specific to the type action
+ // being undertaken.
+ DoAction func(*Action, FlightService_DoActionServer) error
+ //
+ // A flight service exposes all of the available action types that it has
+ // along with descriptions. This allows different flight consumers to
+ // understand the capabilities of the flight service.
+ ListActions func(*Empty, FlightService_ListActionsServer) error
+}
+
+func (s *FlightServiceService) handshake(_ interface{}, stream grpc.ServerStream) error {
+ return s.Handshake(&flightServiceHandshakeServer{stream})
+}
+func (s *FlightServiceService) listFlights(_ interface{}, stream grpc.ServerStream) error {
+ m := new(Criteria)
+ if err := stream.RecvMsg(m); err != nil {
+ return err
+ }
+ return s.ListFlights(m, &flightServiceListFlightsServer{stream})
+}
+func (s *FlightServiceService) getFlightInfo(_ interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := new(FlightDescriptor)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return s.GetFlightInfo(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: s,
+ FullMethod: "/arrow.flight.protocol.FlightService/GetFlightInfo",
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return s.GetFlightInfo(ctx, req.(*FlightDescriptor))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+func (s *FlightServiceService) getSchema(_ interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := new(FlightDescriptor)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return s.GetSchema(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: s,
+ FullMethod: "/arrow.flight.protocol.FlightService/GetSchema",
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return s.GetSchema(ctx, req.(*FlightDescriptor))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+func (s *FlightServiceService) doGet(_ interface{}, stream grpc.ServerStream) error {
+ m := new(Ticket)
+ if err := stream.RecvMsg(m); err != nil {
+ return err
+ }
+ return s.DoGet(m, &flightServiceDoGetServer{stream})
+}
+func (s *FlightServiceService) doPut(_ interface{}, stream grpc.ServerStream) error {
+ return s.DoPut(&flightServiceDoPutServer{stream})
+}
+func (s *FlightServiceService) doExchange(_ interface{}, stream grpc.ServerStream) error {
+ return s.DoExchange(&flightServiceDoExchangeServer{stream})
+}
+func (s *FlightServiceService) doAction(_ interface{}, stream grpc.ServerStream) error {
+ m := new(Action)
+ if err := stream.RecvMsg(m); err != nil {
+ return err
+ }
+ return s.DoAction(m, &flightServiceDoActionServer{stream})
+}
+func (s *FlightServiceService) listActions(_ interface{}, stream grpc.ServerStream) error {
+ m := new(Empty)
+ if err := stream.RecvMsg(m); err != nil {
+ return err
+ }
+ return s.ListActions(m, &flightServiceListActionsServer{stream})
+}
+
+type FlightService_HandshakeServer interface {
+ Send(*HandshakeResponse) error
+ Recv() (*HandshakeRequest, error)
+ grpc.ServerStream
+}
+
+type flightServiceHandshakeServer struct {
+ grpc.ServerStream
+}
+
+func (x *flightServiceHandshakeServer) Send(m *HandshakeResponse) error {
+ return x.ServerStream.SendMsg(m)
+}
+
+func (x *flightServiceHandshakeServer) Recv() (*HandshakeRequest, error) {
+ m := new(HandshakeRequest)
+ if err := x.ServerStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+type FlightService_ListFlightsServer interface {
+ Send(*FlightInfo) error
+ grpc.ServerStream
+}
+
+type flightServiceListFlightsServer struct {
+ grpc.ServerStream
+}
+
+func (x *flightServiceListFlightsServer) Send(m *FlightInfo) error {
+ return x.ServerStream.SendMsg(m)
+}
+
+type FlightService_DoGetServer interface {
+ Send(*FlightData) error
+ grpc.ServerStream
+}
+
+type flightServiceDoGetServer struct {
+ grpc.ServerStream
+}
+
+func (x *flightServiceDoGetServer) Send(m *FlightData) error {
+ return x.ServerStream.SendMsg(m)
+}
+
+type FlightService_DoPutServer interface {
+ Send(*PutResult) error
+ Recv() (*FlightData, error)
+ grpc.ServerStream
+}
+
+type flightServiceDoPutServer struct {
+ grpc.ServerStream
+}
+
+func (x *flightServiceDoPutServer) Send(m *PutResult) error {
+ return x.ServerStream.SendMsg(m)
+}
+
+func (x *flightServiceDoPutServer) Recv() (*FlightData, error) {
+ m := new(FlightData)
+ if err := x.ServerStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+type FlightService_DoExchangeServer interface {
+ Send(*FlightData) error
+ Recv() (*FlightData, error)
+ grpc.ServerStream
+}
+
+type flightServiceDoExchangeServer struct {
+ grpc.ServerStream
+}
+
+func (x *flightServiceDoExchangeServer) Send(m *FlightData) error {
+ return x.ServerStream.SendMsg(m)
+}
+
+func (x *flightServiceDoExchangeServer) Recv() (*FlightData, error) {
+ m := new(FlightData)
+ if err := x.ServerStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+type FlightService_DoActionServer interface {
+ Send(*Result) error
+ grpc.ServerStream
+}
+
+type flightServiceDoActionServer struct {
+ grpc.ServerStream
+}
+
+func (x *flightServiceDoActionServer) Send(m *Result) error {
+ return x.ServerStream.SendMsg(m)
+}
+
+type FlightService_ListActionsServer interface {
+ Send(*ActionType) error
+ grpc.ServerStream
+}
+
+type flightServiceListActionsServer struct {
+ grpc.ServerStream
+}
+
+func (x *flightServiceListActionsServer) Send(m *ActionType) error {
+ return x.ServerStream.SendMsg(m)
+}
+
+// RegisterFlightServiceService registers a service implementation with a gRPC server.
+func RegisterFlightServiceService(s grpc.ServiceRegistrar, srv *FlightServiceService) {
+ srvCopy := *srv
+ if srvCopy.Handshake == nil {
+ srvCopy.Handshake = func(FlightService_HandshakeServer) error {
+ return status.Errorf(codes.Unimplemented, "method Handshake not implemented")
+ }
+ }
+ if srvCopy.ListFlights == nil {
+ srvCopy.ListFlights = func(*Criteria, FlightService_ListFlightsServer) error {
+ return status.Errorf(codes.Unimplemented, "method ListFlights not implemented")
+ }
+ }
+ if srvCopy.GetFlightInfo == nil {
+ srvCopy.GetFlightInfo = func(context.Context, *FlightDescriptor) (*FlightInfo, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method GetFlightInfo not implemented")
+ }
+ }
+ if srvCopy.GetSchema == nil {
+ srvCopy.GetSchema = func(context.Context, *FlightDescriptor) (*SchemaResult, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method GetSchema not implemented")
+ }
+ }
+ if srvCopy.DoGet == nil {
+ srvCopy.DoGet = func(*Ticket, FlightService_DoGetServer) error {
+ return status.Errorf(codes.Unimplemented, "method DoGet not implemented")
+ }
+ }
+ if srvCopy.DoPut == nil {
+ srvCopy.DoPut = func(FlightService_DoPutServer) error {
+ return status.Errorf(codes.Unimplemented, "method DoPut not implemented")
+ }
+ }
+ if srvCopy.DoExchange == nil {
+ srvCopy.DoExchange = func(FlightService_DoExchangeServer) error {
+ return status.Errorf(codes.Unimplemented, "method DoExchange not implemented")
+ }
+ }
+ if srvCopy.DoAction == nil {
+ srvCopy.DoAction = func(*Action, FlightService_DoActionServer) error {
+ return status.Errorf(codes.Unimplemented, "method DoAction not implemented")
+ }
+ }
+ if srvCopy.ListActions == nil {
+ srvCopy.ListActions = func(*Empty, FlightService_ListActionsServer) error {
+ return status.Errorf(codes.Unimplemented, "method ListActions not implemented")
+ }
+ }
+ sd := grpc.ServiceDesc{
+ ServiceName: "arrow.flight.protocol.FlightService",
+ Methods: []grpc.MethodDesc{
+ {
+ MethodName: "GetFlightInfo",
+ Handler: srvCopy.getFlightInfo,
+ },
+ {
+ MethodName: "GetSchema",
+ Handler: srvCopy.getSchema,
+ },
+ },
+ Streams: []grpc.StreamDesc{
+ {
+ StreamName: "Handshake",
+ Handler: srvCopy.handshake,
+ ServerStreams: true,
+ ClientStreams: true,
+ },
+ {
+ StreamName: "ListFlights",
+ Handler: srvCopy.listFlights,
+ ServerStreams: true,
+ },
+ {
+ StreamName: "DoGet",
+ Handler: srvCopy.doGet,
+ ServerStreams: true,
+ },
+ {
+ StreamName: "DoPut",
+ Handler: srvCopy.doPut,
+ ServerStreams: true,
+ ClientStreams: true,
+ },
+ {
+ StreamName: "DoExchange",
+ Handler: srvCopy.doExchange,
+ ServerStreams: true,
+ ClientStreams: true,
+ },
+ {
+ StreamName: "DoAction",
+ Handler: srvCopy.doAction,
+ ServerStreams: true,
+ },
+ {
+ StreamName: "ListActions",
+ Handler: srvCopy.listActions,
+ ServerStreams: true,
+ },
+ },
+ Metadata: "Flight.proto",
+ }
+
+ s.RegisterService(&sd, nil)
+}
+
+// NewFlightServiceService creates a new FlightServiceService containing the
+// implemented methods of the FlightService service in s. Any unimplemented
+// methods will result in the gRPC server returning an UNIMPLEMENTED status to the client.
+// This includes situations where the method handler is misspelled or has the wrong
+// signature. For this reason, this function should be used with great care and
+// is not recommended to be used by most users.
+func NewFlightServiceService(s interface{}) *FlightServiceService {
+ ns := &FlightServiceService{}
+ if h, ok := s.(interface {
+ Handshake(FlightService_HandshakeServer) error
+ }); ok {
+ ns.Handshake = h.Handshake
+ }
+ if h, ok := s.(interface {
+ ListFlights(*Criteria, FlightService_ListFlightsServer) error
+ }); ok {
+ ns.ListFlights = h.ListFlights
+ }
+ if h, ok := s.(interface {
+ GetFlightInfo(context.Context, *FlightDescriptor) (*FlightInfo, error)
+ }); ok {
+ ns.GetFlightInfo = h.GetFlightInfo
+ }
+ if h, ok := s.(interface {
+ GetSchema(context.Context, *FlightDescriptor) (*SchemaResult, error)
+ }); ok {
+ ns.GetSchema = h.GetSchema
+ }
+ if h, ok := s.(interface {
+ DoGet(*Ticket, FlightService_DoGetServer) error
+ }); ok {
+ ns.DoGet = h.DoGet
+ }
+ if h, ok := s.(interface {
+ DoPut(FlightService_DoPutServer) error
+ }); ok {
+ ns.DoPut = h.DoPut
+ }
+ if h, ok := s.(interface {
+ DoExchange(FlightService_DoExchangeServer) error
+ }); ok {
+ ns.DoExchange = h.DoExchange
+ }
+ if h, ok := s.(interface {
+ DoAction(*Action, FlightService_DoActionServer) error
+ }); ok {
+ ns.DoAction = h.DoAction
+ }
+ if h, ok := s.(interface {
+ ListActions(*Empty, FlightService_ListActionsServer) error
+ }); ok {
+ ns.ListActions = h.ListActions
+ }
+ return ns
+}
+
+// UnstableFlightServiceService is the service API for FlightService service.
+// New methods may be added to this interface if they are added to the service
+// definition, which is not a backward-compatible change. For this reason,
+// use of this type is not recommended.
+type UnstableFlightServiceService interface {
+ //
+ // Handshake between client and server. Depending on the server, the
+ // handshake may be required to determine the token that should be used for
+ // future operations. Both request and response are streams to allow multiple
+ // round-trips depending on auth mechanism.
+ Handshake(FlightService_HandshakeServer) error
+ //
+ // Get a list of available streams given a particular criteria. Most flight
+ // services will expose one or more streams that are readily available for
+ // retrieval. This api allows listing the streams available for
+ // consumption. A user can also provide a criteria. The criteria can limit
+ // the subset of streams that can be listed via this interface. Each flight
+ // service allows its own definition of how to consume criteria.
+ ListFlights(*Criteria, FlightService_ListFlightsServer) error
+ //
+ // For a given FlightDescriptor, get information about how the flight can be
+ // consumed. This is a useful interface if the consumer of the interface
+ // already can identify the specific flight to consume. This interface can
+ // also allow a consumer to generate a flight stream through a specified
+ // descriptor. For example, a flight descriptor might be something that
+ // includes a SQL statement or a Pickled Python operation that will be
+ // executed. In those cases, the descriptor will not be previously available
+ // within the list of available streams provided by ListFlights but will be
+ // available for consumption for the duration defined by the specific flight
+ // service.
+ GetFlightInfo(context.Context, *FlightDescriptor) (*FlightInfo, error)
+ //
+ // For a given FlightDescriptor, get the Schema as described in Schema.fbs::Schema
+ // This is used when a consumer needs the Schema of flight stream. Similar to
+ // GetFlightInfo this interface may generate a new flight that was not previously
+ // available in ListFlights.
+ GetSchema(context.Context, *FlightDescriptor) (*SchemaResult, error)
+ //
+ // Retrieve a single stream associated with a particular descriptor
+ // associated with the referenced ticket. A Flight can be composed of one or
+ // more streams where each stream can be retrieved using a separate opaque
+ // ticket that the flight service uses for managing a collection of streams.
+ DoGet(*Ticket, FlightService_DoGetServer) error
+ //
+ // Push a stream to the flight service associated with a particular
+ // flight stream. This allows a client of a flight service to upload a stream
+ // of data. Depending on the particular flight service, a client consumer
+ // could be allowed to upload a single stream per descriptor or an unlimited
+ // number. In the latter, the service might implement a 'seal' action that
+ // can be applied to a descriptor once all streams are uploaded.
+ DoPut(FlightService_DoPutServer) error
+ //
+ // Open a bidirectional data channel for a given descriptor. This
+ // allows clients to send and receive arbitrary Arrow data and
+ // application-specific metadata in a single logical stream. In
+ // contrast to DoGet/DoPut, this is more suited for clients
+ // offloading computation (rather than storage) to a Flight service.
+ DoExchange(FlightService_DoExchangeServer) error
+ //
+ // Flight services can support an arbitrary number of simple actions in
+ // addition to the possible ListFlights, GetFlightInfo, DoGet, DoPut
+ // operations that are potentially available. DoAction allows a flight client
+ // to do a specific action against a flight service. An action includes
+ // opaque request and response objects that are specific to the type action
+ // being undertaken.
+ DoAction(*Action, FlightService_DoActionServer) error
+ //
+ // A flight service exposes all of the available action types that it has
+ // along with descriptions. This allows different flight consumers to
+ // understand the capabilities of the flight service.
+ ListActions(*Empty, FlightService_ListActionsServer) error
+}
diff --git a/go/arrow/flight/client.go b/go/arrow/flight/client.go
new file mode 100644
index 0000000..3b490a5
--- /dev/null
+++ b/go/arrow/flight/client.go
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package flight
+
+import (
+ "context"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// Client is an interface wrapped around the generated FlightServiceClient which is
+// generated by grpc protobuf definitions. This interface provides a useful hiding
+// of the authentication handshake via calling Authenticate and using the
+// ClientAuthHandler rather than manually having to implement the grpc communication
+// and sending of the auth token.
+type Client interface {
+ // Authenticate uses the ClientAuthHandler that was used when creating the client
+ // in order to use the Handshake endpoints of the service.
+ Authenticate(context.Context, ...grpc.CallOption) error
+ Close() error
+ // join the interface from the FlightServiceClient instead of re-defining all
+ // the endpoints here.
+ FlightServiceClient
+}
+
+type client struct {
+ conn *grpc.ClientConn
+ authHandler ClientAuthHandler
+
+ FlightServiceClient
+}
+
+// NewFlightClient takes in the address of the grpc server and an auth handler for the
+// application-level handshake. If using TLS or other grpc configurations they can still
+// be passed via the grpc.DialOption list just as if connecting manually without this
+// helper function.
+//
+// Alternatively, a grpc client can be constructed as normal without this helper as the
+// grpc generated client code is still exported. This exists to add utility and helpers
+// around the authentication and passing the token with requests.
+func NewFlightClient(addr string, auth ClientAuthHandler, opts ...grpc.DialOption) (Client, error) {
+ if auth != nil {
+ opts = append([]grpc.DialOption{
+ grpc.WithChainStreamInterceptor(createClientAuthStreamInterceptor(auth)),
+ grpc.WithChainUnaryInterceptor(createClientAuthUnaryInterceptor(auth)),
+ }, opts...)
+ }
+
+ conn, err := grpc.Dial(addr, opts...)
+ if err != nil {
+ return nil, err
+ }
+
+ return &client{conn: conn, FlightServiceClient: NewFlightServiceClient(conn), authHandler: auth}, nil
+}
+
+func (c *client) Authenticate(ctx context.Context, opts ...grpc.CallOption) error {
+ if c.authHandler == nil {
+ return status.Error(codes.NotFound, "cannot authenticate without an auth-handler")
+ }
+
+ stream, err := c.FlightServiceClient.Handshake(ctx, opts...)
+ if err != nil {
+ return err
+ }
+
+ return c.authHandler.Authenticate(&clientAuthConn{stream})
+}
+
+func (c *client) Close() error {
+ c.FlightServiceClient = nil
+ return c.conn.Close()
+}
diff --git a/go/arrow/flight/client_auth.go b/go/arrow/flight/client_auth.go
new file mode 100644
index 0000000..c95606b
--- /dev/null
+++ b/go/arrow/flight/client_auth.go
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package flight
+
+import (
+ "context"
+ "strings"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/status"
+)
+
+// ClientAuthHandler defines an interface for the Flight client to perform
+// the authentication handshake. The token that is retrieved from GetToken
+// will be sent as part of the context metadata in subsequent requests after
+// authentication is performed using the key "auth-token-bin".
+type ClientAuthHandler interface {
+ Authenticate(AuthConn) error
+ GetToken() (string, error)
+}
+
+type clientAuthConn struct {
+ stream FlightService_HandshakeClient
+}
+
+func (a *clientAuthConn) Read() ([]byte, error) {
+ in, err := a.stream.Recv()
+ if err != nil {
+ return nil, err
+ }
+
+ return in.Payload, nil
+}
+
+func (a *clientAuthConn) Send(b []byte) error {
+ return a.stream.Send(&HandshakeRequest{Payload: b})
+}
+
+func createClientAuthUnaryInterceptor(auth ClientAuthHandler) grpc.UnaryClientInterceptor {
+ if auth == nil {
+ return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
+ return invoker(ctx, method, req, reply, cc, opts...)
+ }
+ }
+
+ return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
+ tok, err := auth.GetToken()
+ if err != nil {
+ return status.Errorf(codes.Unauthenticated, "error retrieving token: %s", err)
+ }
+
+ return invoker(metadata.NewOutgoingContext(ctx, metadata.Pairs(grpcAuthHeader, tok)), method, req, reply, cc, opts...)
+ }
+}
+
+func createClientAuthStreamInterceptor(auth ClientAuthHandler) grpc.StreamClientInterceptor {
+ if auth == nil {
+ return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
+ return streamer(ctx, desc, cc, method, opts...)
+ }
+ }
+
+ return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
+ if strings.HasSuffix(method, "/Handshake") {
+ return streamer(ctx, desc, cc, method, opts...)
+ }
+
+ tok, err := auth.GetToken()
+ if err != nil {
+ return nil, status.Errorf(codes.Unauthenticated, "error retrieving token: %s", err)
+ }
+
+ return streamer(metadata.NewOutgoingContext(ctx, metadata.Pairs(grpcAuthHeader, tok)), desc, cc, method, opts...)
+ }
+}
diff --git a/go/arrow/flight/example_flight_server_test.go b/go/arrow/flight/example_flight_server_test.go
new file mode 100644
index 0000000..3fc1050
--- /dev/null
+++ b/go/arrow/flight/example_flight_server_test.go
@@ -0,0 +1,70 @@
+package flight_test
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "log"
+
+ "github.com/apache/arrow/go/arrow/flight"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+type serverAuth struct{}
+
+func (sa *serverAuth) Authenticate(c flight.AuthConn) error {
+ in, err := c.Read()
+ if err == io.EOF {
+ return status.Error(codes.Unauthenticated, "no auth info provided")
+ }
+
+ if err != nil {
+ return status.Error(codes.FailedPrecondition, "error reading auth handshake")
+ }
+
+ // do something with in....
+ fmt.Println(string(in))
+
+ // send auth token back
+ return c.Send([]byte("foobar"))
+}
+
+func (sa *serverAuth) IsValid(token string) (interface{}, error) {
+ if token == "foobar" {
+ return "foo", nil
+ }
+ return "", status.Error(codes.PermissionDenied, "invalid auth token")
+}
+
+func Example_server() {
+ server := flight.NewFlightServer(&serverAuth{})
+ server.Init("localhost:0")
+ server.RegisterFlightService(&flight.FlightServiceService{})
+
+ go server.Serve()
+ defer server.Shutdown()
+
+ conn, err := grpc.Dial(server.Addr().String(), grpc.WithInsecure())
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer conn.Close()
+
+ client := flight.NewFlightServiceClient(conn)
+ stream, err := client.Handshake(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ // ignore error handling here for brevity
+ stream.Send(&flight.HandshakeRequest{Payload: []byte("baz")})
+
+ resp, _ := stream.Recv()
+ fmt.Println(string(resp.Payload))
+
+ // Output:
+ // baz
+ // foobar
+}
diff --git a/go/arrow/flight/flight_test.go b/go/arrow/flight/flight_test.go
new file mode 100644
index 0000000..ed57bbc
--- /dev/null
+++ b/go/arrow/flight/flight_test.go
@@ -0,0 +1,305 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package flight_test
+
+import (
+ "context"
+ "errors"
+ "io"
+ "testing"
+
+ "github.com/apache/arrow/go/arrow/array"
+ "github.com/apache/arrow/go/arrow/flight"
+ "github.com/apache/arrow/go/arrow/internal/arrdata"
+ "github.com/apache/arrow/go/arrow/ipc"
+ "github.com/apache/arrow/go/arrow/memory"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+type flightServer struct {
+ mem memory.Allocator
+}
+
+func (f *flightServer) getmem() memory.Allocator {
+ if f.mem == nil {
+ f.mem = memory.NewGoAllocator()
+ }
+
+ return f.mem
+}
+
+func (f *flightServer) ListFlights(c *flight.Criteria, fs flight.FlightService_ListFlightsServer) error {
+ expr := string(c.GetExpression())
+
+ auth := ""
+ authVal := flight.AuthFromContext(fs.Context())
+ if authVal != nil {
+ auth = authVal.(string)
+ }
+
+ for _, name := range arrdata.RecordNames {
+ if expr != "" && expr != name {
+ continue
+ }
+
+ recs := arrdata.Records[name]
+ totalRows := int64(0)
+ for _, r := range recs {
+ totalRows += r.NumRows()
+ }
+
+ fs.Send(&flight.FlightInfo{
+ Schema: ipc.FlightInfoSchemaBytes(recs[0].Schema(), f.getmem()),
+ FlightDescriptor: &flight.FlightDescriptor{
+ Type: flight.FlightDescriptor_PATH,
+ Path: []string{name, auth},
+ },
+ TotalRecords: totalRows,
+ TotalBytes: -1,
+ })
+ }
+
+ return nil
+}
+
+func (f *flightServer) GetSchema(_ context.Context, in *flight.FlightDescriptor) (*flight.SchemaResult, error) {
+ if in == nil {
+ return nil, status.Error(codes.InvalidArgument, "invalid flight descriptor")
+ }
+
+ recs, ok := arrdata.Records[in.Path[0]]
+ if !ok {
+ return nil, status.Error(codes.NotFound, "flight not found")
+ }
+
+ return &flight.SchemaResult{Schema: ipc.FlightInfoSchemaBytes(recs[0].Schema(), f.getmem())}, nil
+}
+
+func (f *flightServer) DoGet(tkt *flight.Ticket, fs flight.FlightService_DoGetServer) error {
+ recs := arrdata.Records[string(tkt.GetTicket())]
+
+ w := ipc.NewFlightDataWriter(fs, ipc.WithSchema(recs[0].Schema()))
+ for _, r := range recs {
+ w.Write(r)
+ }
+
+ return nil
+}
+
+type servAuth struct{}
+
+func (a *servAuth) Authenticate(c flight.AuthConn) error {
+ _, err := c.Read()
+ if err == io.EOF {
+ return nil
+ }
+
+ if err != nil {
+ return err
+ }
+
+ return c.Send([]byte("baz"))
+}
+
+func (a *servAuth) IsValid(token string) (interface{}, error) {
+ if token == "baz" {
+ return "bar", nil
+ }
+ return "", errors.New("novalid")
+}
+
+type clientAuth struct{}
+
+func (a *clientAuth) Authenticate(c flight.AuthConn) error {
+ if err := c.Send([]byte("foobar")); err != nil {
+ return err
+ }
+
+ _, err := c.Read()
+ return err
+}
+
+func (a *clientAuth) GetToken() (string, error) {
+ return "baz", nil
+}
+
+func TestListFlights(t *testing.T) {
+ s := flight.NewFlightServer(nil)
+ s.Init("localhost:0")
+ f := &flightServer{}
+ s.RegisterFlightService(&flight.FlightServiceService{
+ ListFlights: f.ListFlights,
+ })
+
+ go s.Serve()
+ defer s.Shutdown()
+
+ client, err := flight.NewFlightClient(s.Addr().String(), nil, grpc.WithInsecure())
+ if err != nil {
+ t.Error(err)
+ }
+ defer client.Close()
+
+ flightStream, err := client.ListFlights(context.Background(), &flight.Criteria{})
+ if err != nil {
+ t.Error(err)
+ }
+
+ for {
+ info, err := flightStream.Recv()
+ if err == io.EOF {
+ break
+ } else if err != nil {
+ t.Error(err)
+ }
+
+ fname := info.GetFlightDescriptor().GetPath()[0]
+ recs, ok := arrdata.Records[fname]
+ if !ok {
+ t.Fatalf("got unknown flight info: %s", fname)
+ }
+
+ sc, err := ipc.SchemaFromFlightInfo(info.GetSchema())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if !recs[0].Schema().Equal(sc) {
+ t.Fatalf("flight info schema transfer failed: \ngot = %#v\nwant = %#v\n", sc, recs[0].Schema())
+ }
+
+ var total int64 = 0
+ for _, r := range recs {
+ total += r.NumRows()
+ }
+
+ if info.TotalRecords != total {
+ t.Fatalf("got wrong number of total records: got = %d, wanted = %d", info.TotalRecords, total)
+ }
+ }
+}
+
+func TestGetSchema(t *testing.T) {
+ s := flight.NewFlightServer(nil)
+ s.Init("localhost:0")
+ f := &flightServer{}
+ s.RegisterFlightService(&flight.FlightServiceService{
+ GetSchema: f.GetSchema,
+ })
+
+ go s.Serve()
+ defer s.Shutdown()
+
+ client, err := flight.NewFlightClient(s.Addr().String(), nil, grpc.WithInsecure())
+ if err != nil {
+ t.Error(err)
+ }
+ defer client.Close()
+
+ for name, testrecs := range arrdata.Records {
+ t.Run("flight get schema: "+name, func(t *testing.T) {
+ res, err := client.GetSchema(context.Background(), &flight.FlightDescriptor{Path: []string{name}})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ schema, err := ipc.SchemaFromFlightInfo(res.GetSchema())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if !testrecs[0].Schema().Equal(schema) {
+ t.Fatalf("schema not match: \ngot = %#v\nwant = %#v\n", schema, testrecs[0].Schema())
+ }
+ })
+ }
+}
+
+func TestServer(t *testing.T) {
+ f := &flightServer{}
+ service := &flight.FlightServiceService{
+ ListFlights: f.ListFlights,
+ DoGet: f.DoGet,
+ }
+
+ s := flight.NewFlightServer(&servAuth{})
+ s.Init("localhost:0")
+ s.RegisterFlightService(service)
+
+ go s.Serve()
+ defer s.Shutdown()
+
+ client, err := flight.NewFlightClient(s.Addr().String(), &clientAuth{}, grpc.WithInsecure())
+ if err != nil {
+ t.Error(err)
+ }
+ defer client.Close()
+
+ err = client.Authenticate(context.Background())
+ if err != nil {
+ t.Error(err)
+ }
+
+ fistream, err := client.ListFlights(context.Background(), &flight.Criteria{Expression: []byte("decimal128")})
+ if err != nil {
+ t.Error(err)
+ }
+
+ fi, err := fistream.Recv()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if len(fi.FlightDescriptor.GetPath()) != 2 || fi.FlightDescriptor.GetPath()[1] != "bar" {
+ t.Fatalf("path should have auth info: want %s got %s", "bar", fi.FlightDescriptor.GetPath()[1])
+ }
+
+ fdata, err := client.DoGet(context.Background(), &flight.Ticket{Ticket: []byte("decimal128")})
+ if err != nil {
+ t.Error(err)
+ }
+
+ r, err := ipc.NewFlightDataReader(fdata)
+ if err != nil {
+ t.Error(err)
+ }
+
+ expected := arrdata.Records["decimal128"]
+ idx := 0
+ var numRows int64 = 0
+ for {
+ rec, err := r.Read()
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+ t.Error(err)
+ }
+
+ numRows += rec.NumRows()
+ if !array.RecordEqual(expected[idx], rec) {
+ t.Errorf("flight data stream records don't match: \ngot = %#v\nwant = %#v", rec, expected[idx])
+ }
+ idx++
+ }
+
+ if numRows != fi.TotalRecords {
+ t.Fatalf("got %d, want %d", numRows, fi.TotalRecords)
+ }
+}
diff --git a/go/arrow/go.mod b/go/arrow/flight/gen.go
similarity index 73%
copy from go/arrow/go.mod
copy to go/arrow/flight/gen.go
index a83f7cc..8a4b874 100644
--- a/go/arrow/go.mod
+++ b/go/arrow/flight/gen.go
@@ -14,14 +14,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-module github.com/apache/arrow/go/arrow
+package flight // import "github.com/apache/arrow/go/arrow/flight"
-go 1.12
-
-require (
- github.com/davecgh/go-spew v1.1.0 // indirect
- github.com/google/flatbuffers v1.11.0
- github.com/pmezard/go-difflib v1.0.0 // indirect
- github.com/stretchr/testify v1.2.0
- golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543
-)
+//go:generate protoc -I../../../format --go_out=. --go-grpc_out=. --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative Flight.proto
diff --git a/go/arrow/flight/server.go b/go/arrow/flight/server.go
new file mode 100644
index 0000000..90492a7
--- /dev/null
+++ b/go/arrow/flight/server.go
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package flight
+
+import (
+ "net"
+ "os"
+ "os/signal"
+
+ "google.golang.org/grpc"
+)
+
+// Server is an interface for hiding some of the grpc specifics to make
+// it slightly easier to manage a flight service, slightly modeled after
+// the C++ implementation
+type Server interface {
+ // Init takes in the address to bind to and creates the listener
+ Init(addr string) error
+ // Addr will return the address that was bound to for the service to listen on
+ Addr() net.Addr
+ // SetShutdownOnSignals sets notifications on the given signals to call GracefulStop
+ // on the grpc service if any of those signals are received
+ SetShutdownOnSignals(sig ...os.Signal)
+ // Serve blocks until accepting a connection fails with a fatal error. It will return
+ // a non-nil error unless it stopped due to calling Shutdown or receiving one of the
+ // signals set in SetShutdownOnSignals
+ Serve() error
+ // Shutdown will call GracefulStop on the grpc server so that it stops accepting connections
+ // and will wait until current methods complete
+ Shutdown()
+ // RegisterFlightService sets up the handler for the Flight Endpoints as per
+ // normal Grpc setups
+ RegisterFlightService(*FlightServiceService)
+}
+
+type server struct {
+ lis net.Listener
+ sigChannel <-chan os.Signal
+ done chan bool
+
+ authHandler ServerAuthHandler
+ server *grpc.Server
+}
+
+// NewFlightServer takes in an auth handler for managing the handshake authentication
+// and any grpc Server options desired, such as TLS certs and so on which will just
+// be passed through to the underlying grpc server.
+//
+// Alternatively, a grpc server can be created normally without this helper as the
+// grpc server generated code is still being exported. This only exists to allow
+// the utility of the helpers
+func NewFlightServer(auth ServerAuthHandler, opt ...grpc.ServerOption) Server {
+ if auth != nil {
+ opt = append([]grpc.ServerOption{
+ grpc.ChainStreamInterceptor(createServerAuthStreamInterceptor(auth)),
+ grpc.ChainUnaryInterceptor(createServerAuthUnaryInterceptor(auth)),
+ }, opt...)
+ }
+
+ return &server{
+ authHandler: auth,
+ server: grpc.NewServer(opt...),
+ }
+}
+
+func (s *server) Init(addr string) (err error) {
+ s.lis, err = net.Listen("tcp", addr)
+ return
+}
+
+func (s *server) Addr() net.Addr {
+ return s.lis.Addr()
+}
+
+func (s *server) SetShutdownOnSignals(sig ...os.Signal) {
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, sig...)
+ s.sigChannel = c
+}
+
+func (s *server) Serve() error {
+ s.done = make(chan bool)
+ go func() {
+ select {
+ case <-s.sigChannel:
+ s.server.GracefulStop()
+ case <-s.done:
+ }
+ }()
+ err := s.server.Serve(s.lis)
+ close(s.done)
+ return err
+}
+
+func (s *server) RegisterFlightService(svc *FlightServiceService) {
+ if svc.Handshake == nil {
+ svc.Handshake = s.handshake
+ }
+ RegisterFlightServiceService(s.server, svc)
+}
+
+func (s *server) Shutdown() {
+ s.server.GracefulStop()
+}
diff --git a/go/arrow/flight/server_auth.go b/go/arrow/flight/server_auth.go
new file mode 100644
index 0000000..99e6f8e
--- /dev/null
+++ b/go/arrow/flight/server_auth.go
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package flight
+
+import (
+ "context"
+ "strings"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/status"
+)
+
+const grpcAuthHeader = "auth-token-bin"
+
+// AuthConn wraps the stream from grpc for handshakes to simplify handling
+// handshake request and response from the flight.proto forwarding just the
+// payloads and errors instead of having to deal with the handshake request
+// and response protos directly
+type AuthConn interface {
+ Read() ([]byte, error)
+ Send([]byte) error
+}
+
+type serverAuthConn struct {
+ stream FlightService_HandshakeServer
+}
+
+func (a *serverAuthConn) Read() ([]byte, error) {
+ in, err := a.stream.Recv()
+ if err != nil {
+ return nil, err
+ }
+
+ return in.Payload, nil
+}
+
+func (a *serverAuthConn) Send(b []byte) error {
+ return a.stream.Send(&HandshakeResponse{Payload: b})
+}
+
+// ServerAuthHandler defines an interface for the server to perform the handshake.
+// The token is expected to be sent as part of the context metadata in subsequent
+// requests with a key of "auth-token-bin" which will then call IsValid to validate
+type ServerAuthHandler interface {
+ Authenticate(AuthConn) error
+ IsValid(token string) (interface{}, error)
+}
+
+type authCtxKey struct{}
+
+type authWrappedStream struct {
+ grpc.ServerStream
+ ctx context.Context
+}
+
+func (a *authWrappedStream) Context() context.Context { return a.ctx }
+
+// AuthFromContext will return back whatever object was returned from `IsValid` for a
+// given request context allowing handlers to retrieve identifying information
+// for the current request for use.
+func AuthFromContext(ctx context.Context) interface{} {
+ return ctx.Value(authCtxKey{})
+}
+
+func createServerAuthUnaryInterceptor(auth ServerAuthHandler) grpc.UnaryServerInterceptor {
+ if auth == nil {
+ return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+ return handler(ctx, req)
+ }
+ }
+
+ return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+ var authTok string
+ md, ok := metadata.FromIncomingContext(ctx)
+ if ok {
+ vals := md.Get(grpcAuthHeader)
+ if len(vals) > 0 {
+ authTok = vals[0]
+ }
+ }
+
+ peerIdentity, err := auth.IsValid(authTok)
+ if err != nil {
+ return nil, status.Errorf(codes.PermissionDenied, "auth-error: %s", err)
+ }
+
+ return handler(context.WithValue(ctx, authCtxKey{}, peerIdentity), req)
+ }
+}
+
+func createServerAuthStreamInterceptor(auth ServerAuthHandler) grpc.StreamServerInterceptor {
+ if auth == nil {
+ return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ return handler(srv, stream)
+ }
+ }
+
+ return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ if strings.HasSuffix(info.FullMethod, "/Handshake") {
+ return handler(srv, stream)
+ }
+
+ var authTok string
+ md, ok := metadata.FromIncomingContext(stream.Context())
+ if ok {
+ vals := md.Get(grpcAuthHeader)
+ if len(vals) > 0 {
+ authTok = vals[0]
+ }
+ }
+
+ peerIdentity, err := auth.IsValid(authTok)
+ if err != nil {
+ return status.Errorf(codes.Unauthenticated, "auth-error: %s", err)
+ }
+
+ stream = &authWrappedStream{ServerStream: stream, ctx: context.WithValue(stream.Context(), authCtxKey{}, peerIdentity)}
+ return handler(srv, stream)
+ }
+}
+
+// our implementation of handshake using the authhandler
+func (s *server) handshake(stream FlightService_HandshakeServer) error {
+ if s.authHandler == nil {
+ return nil
+ }
+
+ return s.authHandler.Authenticate(&serverAuthConn{stream})
+}
diff --git a/go/arrow/go.mod b/go/arrow/go.mod
index a83f7cc..ca76028 100644
--- a/go/arrow/go.mod
+++ b/go/arrow/go.mod
@@ -20,8 +20,16 @@ go 1.12
require (
github.com/davecgh/go-spew v1.1.0 // indirect
+ github.com/golang/protobuf v1.4.2
github.com/google/flatbuffers v1.11.0
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/testify v1.2.0
+ golang.org/x/net v0.0.0-20200904194848-62affa334b73 // indirect
+ golang.org/x/sys v0.0.0-20200909081042-eff7692f9009 // indirect
+ golang.org/x/text v0.3.3 // indirect
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543
+ google.golang.org/genproto v0.0.0-20200911024640-645f7a48b24f // indirect
+ google.golang.org/grpc v1.32.0
+ google.golang.org/grpc/cmd/protoc-gen-go-grpc v0.0.0-20200910201057-6591123024b3 // indirect
+ google.golang.org/protobuf v1.25.0
)
diff --git a/go/arrow/go.sum b/go/arrow/go.sum
index 25e38a1..f56e738 100644
--- a/go/arrow/go.sum
+++ b/go/arrow/go.sum
@@ -1,10 +1,104 @@
+cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
+github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
+github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
+github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
+github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
+github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
+github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
+github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
+github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
+github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
+github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
+github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
+github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
+github.com/golang/protobuf v1.4.1 h1:ZFgWrT+bLgsYPirOnRfKLYJLvssAegOj/hgyMFdJZe0=
+github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
+github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
+github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/google/flatbuffers v1.11.0 h1:O7CEyB8Cb3/DmtxODGtLHcEvpr81Jm5qLg/hsHnxA2A=
github.com/google/flatbuffers v1.11.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
+github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
+github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
+github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/stretchr/testify v1.2.0 h1:LThGCOvhuJic9Gyd1VBCkhyUXmO8vKaBFvBsJ2k03rg=
github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
+golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
+golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
+golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20200904194848-62affa334b73 h1:MXfv8rhZWmFeqX3GNZRsd6vOLoaCHjYEX3qkRo3YBUA=
+golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
+golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200909081042-eff7692f9009 h1:W0lCpv29Hv0UaM1LXb9QlBHLNP8UFfcKjblhVCWftOM=
+golang.org/x/sys v0.0.0-20200909081042-eff7692f9009/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
+golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
+google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
+google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
+google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
+google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY=
+google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
+google.golang.org/genproto v0.0.0-20200911024640-645f7a48b24f h1:Yv4xsIx7HZOoyUGSJ2ksDyWE2qIBXROsZKt2ny3hCGM=
+google.golang.org/genproto v0.0.0-20200911024640-645f7a48b24f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
+google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
+google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
+google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
+google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg=
+google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
+google.golang.org/grpc v1.32.0 h1:zWTV+LMdc3kaiJMSTOFz2UgSBgx8RNQoTGiZu3fR9S0=
+google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
+google.golang.org/grpc/cmd/protoc-gen-go-grpc v0.0.0-20200910201057-6591123024b3 h1:MZjUhWVLZHiPPNKvwdt31HZVHrASfgk1ScV3vVTKbDo=
+google.golang.org/grpc/cmd/protoc-gen-go-grpc v0.0.0-20200910201057-6591123024b3/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
+google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
+google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
+google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
+google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
+google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
+google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
+google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
+google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
+honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
diff --git a/go/arrow/ipc/flight_data_reader.go b/go/arrow/ipc/flight_data_reader.go
new file mode 100644
index 0000000..462286f
--- /dev/null
+++ b/go/arrow/ipc/flight_data_reader.go
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package ipc
+
+import (
+ "bytes"
+ "io"
+ "sync/atomic"
+
+ "github.com/apache/arrow/go/arrow"
+ "github.com/apache/arrow/go/arrow/array"
+ "github.com/apache/arrow/go/arrow/arrio"
+ "github.com/apache/arrow/go/arrow/flight"
+ "github.com/apache/arrow/go/arrow/internal/debug"
+ "github.com/apache/arrow/go/arrow/internal/flatbuf"
+ "github.com/apache/arrow/go/arrow/memory"
+ "golang.org/x/xerrors"
+)
+
+// FlightDataStreamReader wraps a grpc stream for receiving FlightData objects
+type FlightDataStreamReader interface {
+ Recv() (*flight.FlightData, error)
+}
+
+// FlightDataReader reads records from a stream of messages
+type FlightDataReader struct {
+ r FlightDataStreamReader
+ schema *arrow.Schema
+
+ refCount int64
+ rec array.Record
+ err error
+
+ types dictTypeMap
+ memo dictMemo
+
+ mem memory.Allocator
+
+ done bool
+}
+
+// NewFlightDataReader returns a reader that will produce records from a flight data stream
+//
+// implementation is generally based on the ipc.Reader, expecting the first message to be the
+// schema with the subsequent messages being the record batches.
+func NewFlightDataReader(r FlightDataStreamReader, opts ...Option) (*FlightDataReader, error) {
+ cfg := newConfig(opts...)
+
+ rr := &FlightDataReader{
+ r: r,
+ mem: cfg.alloc,
+ }
+
+ msg, err := rr.nextMessage()
+ if err != nil {
+ return nil, xerrors.Errorf("arrow/ipc: could not read message schema: %w", err)
+ }
+
+ if msg.Type() != MessageSchema {
+ return nil, xerrors.Errorf("arrow/ipc: invalid message type (got=%v, want=%v)", msg.Type(), MessageSchema)
+ }
+
+ // FIXME(sbinet) refactor msg-header handling (a la ipc.Reader.readSchema)
+ var schemaFB flatbuf.Schema
+ initFB(&schemaFB, msg.msg.Header)
+
+ rr.types, err = dictTypesFromFB(&schemaFB)
+ if err != nil {
+ return nil, xerrors.Errorf("arrow/ipc: could not read dictionary types from message schema: %w", err)
+ }
+
+ // TODO(sbinet): see ipc.Reader.readSchema
+ for range rr.types {
+ panic("not implemented") // ReadNextDictionary
+ }
+
+ rr.schema, err = schemaFromFB(&schemaFB, &rr.memo)
+ if err != nil {
+ return nil, xerrors.Errorf("arrow/ipc: could not decode schema from message schema: %w", err)
+ }
+
+ if cfg.schema != nil && !cfg.schema.Equal(rr.schema) {
+ return nil, errInconsistentSchema
+ }
+
+ return rr, nil
+}
+
+func (f *FlightDataReader) nextMessage() (*Message, error) {
+ fd, err := f.r.Recv()
+ if err != nil {
+ return nil, err
+ }
+
+ return NewMessage(memory.NewBufferBytes(fd.DataHeader), memory.NewBufferBytes(fd.DataBody)), nil
+}
+
+func (f *FlightDataReader) next() bool {
+ var msg *Message
+ msg, f.err = f.nextMessage()
+ if f.err != nil {
+ f.done = true
+ if f.err == io.EOF {
+ f.err = nil
+ }
+ return false
+ }
+
+ if got, want := msg.Type(), MessageRecordBatch; got != want {
+ f.err = xerrors.Errorf("arrow/ipc: invalid message type (got=%v, want=%v)", got, want)
+ return false
+ }
+
+ f.rec = newRecord(f.schema, msg.meta, bytes.NewReader(msg.body.Bytes()))
+ return true
+}
+
+// Record returns the current record that has been extracted from the stream.
+// It is valid until the next call to Next or Read
+func (f *FlightDataReader) Record() array.Record {
+ return f.rec
+}
+
+// Next returns whether a record was able to be extracted from the stream or not.
+func (f *FlightDataReader) Next() bool {
+ if f.rec != nil {
+ f.rec.Release()
+ f.rec = nil
+ }
+
+ if f.err != nil || f.done {
+ return false
+ }
+
+ return f.next()
+}
+
+// Read reads the current record from the flight stream and an error, if any.
+// When we reach the end of the flight stream it will return (nil, io.EOF).
+// Also calls release on the previous existing record if any.
+func (f *FlightDataReader) Read() (array.Record, error) {
+ if f.rec != nil {
+ f.rec.Release()
+ f.rec = nil
+ }
+
+ if !f.next() {
+ if f.done {
+ return nil, io.EOF
+ }
+ return nil, f.err
+ }
+
+ return f.rec, nil
+}
+
+// Retain increases the refcount by 1.
+// Retain can be called by multiple goroutines simultaneously.
+func (f *FlightDataReader) Retain() {
+ atomic.AddInt64(&f.refCount, 1)
+}
+
+// Release decreases the refcount by 1.
+// When the refcount is 0 the memory is freed.
+// Release may be called simultaneously from multiple goroutines.
+func (f *FlightDataReader) Release() {
+ debug.Assert(atomic.LoadInt64(&f.refCount) > 0, "too many releases")
+
+ if atomic.AddInt64(&f.refCount, -1) == 0 {
+ if f.rec != nil {
+ f.rec.Release()
+ f.rec = nil
+ }
+ if f.r != nil {
+ f.r = nil
+ }
+ }
+}
+
+// Err returns the last error encounted during the iteration of the stream.
+func (f *FlightDataReader) Err() error { return f.err }
+
+// Schema returns the schema of the underlying records as described by the
+// first message received.
+func (f *FlightDataReader) Schema() *arrow.Schema { return f.schema }
+
+func SchemaFromFlightInfo(b []byte) (*arrow.Schema, error) {
+ fb := flatbuf.GetRootAsSchema(b, 0)
+ dict := newMemo()
+ return schemaFromFB(fb, &dict)
+}
+
+var (
+ _ array.RecordReader = (*FlightDataReader)(nil)
+ _ arrio.Reader = (*FlightDataReader)(nil)
+)
diff --git a/go/arrow/ipc/flight_data_writer.go b/go/arrow/ipc/flight_data_writer.go
new file mode 100644
index 0000000..7a5bc27
--- /dev/null
+++ b/go/arrow/ipc/flight_data_writer.go
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package ipc
+
+import (
+ "bytes"
+
+ "github.com/apache/arrow/go/arrow"
+ "github.com/apache/arrow/go/arrow/array"
+ "github.com/apache/arrow/go/arrow/arrio"
+ "github.com/apache/arrow/go/arrow/bitutil"
+ "github.com/apache/arrow/go/arrow/flight"
+ "github.com/apache/arrow/go/arrow/memory"
+ flatbuffers "github.com/google/flatbuffers/go"
+ "golang.org/x/xerrors"
+)
+
+// FlightDataStreamWriter wraps a grpc stream for sending FlightData
+type FlightDataStreamWriter interface {
+ Send(*flight.FlightData) error
+}
+
+// FlightDataWriter is a stream writer for writing with Flight RPC
+type FlightDataWriter struct {
+ w FlightDataStreamWriter
+ fd flight.FlightData
+ buf bytes.Buffer
+
+ mem memory.Allocator
+ started bool
+ schema *arrow.Schema
+}
+
+// NewFlightDataWriter returns a writer for writing array Records to a flight data stream.
+func NewFlightDataWriter(w FlightDataStreamWriter, opts ...Option) *FlightDataWriter {
+ cfg := newConfig(opts...)
+ return &FlightDataWriter{
+ w: w,
+ mem: cfg.alloc,
+ schema: cfg.schema,
+ }
+}
+
+func (w *FlightDataWriter) start() error {
+ w.started = true
+
+ ps := payloadsFromSchema(w.schema, w.mem, nil)
+ defer ps.Release()
+
+ for i := range ps {
+ if err := w.writePayload(&ps[i]); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (w *FlightDataWriter) Close() (err error) {
+ if !w.started {
+ err = w.start()
+ }
+
+ return err
+}
+
+// Write the provided record to the underlying stream
+func (w *FlightDataWriter) Write(rec array.Record) error {
+ if !w.started {
+ err := w.start()
+ if err != nil {
+ return err
+ }
+ }
+
+ schema := rec.Schema()
+ if schema == nil || !schema.Equal(w.schema) {
+ return errInconsistentSchema
+ }
+
+ const allow64b = true
+ var (
+ data = payload{}
+ enc = newRecordEncoder(w.mem, 0, kMaxNestingDepth, allow64b)
+ )
+ defer data.Release()
+
+ if err := enc.Encode(&data, rec); err != nil {
+ return xerrors.Errorf("arrow/ipc: could not encode record to payload: %w", err)
+ }
+
+ return w.writePayload(&data)
+}
+
+func (w *FlightDataWriter) writePayload(data *payload) (err error) {
+ w.fd.DataHeader = data.meta.Bytes()
+ tmp := &w.buf
+ tmp.Reset()
+
+ for _, bufs := range data.body {
+ if bufs == nil {
+ continue
+ }
+
+ size := int64(bufs.Len())
+ padding := bitutil.CeilByte64(size) - size
+ if size > 0 {
+ _, err = tmp.Write(bufs.Bytes())
+ if err != nil {
+ return xerrors.Errorf("arrow/ipc: could not write payload message body: %w", err)
+ }
+ }
+
+ if padding > 0 {
+ _, err = tmp.Write(paddingBytes[:padding])
+ if err != nil {
+ return xerrors.Errorf("arrow/ipc: could not write payload message padding: %w", err)
+ }
+ }
+ }
+
+ w.fd.DataBody = tmp.Bytes()
+ return w.w.Send(&w.fd)
+}
+
+func FlightInfoSchemaBytes(schema *arrow.Schema, mem memory.Allocator) []byte {
+ dict := newMemo()
+ b := flatbuffers.NewBuilder(1024)
+ offset := schemaToFB(b, schema, &dict)
+ b.Finish(offset)
+ return b.FinishedBytes()
+}
+
+var (
+ _ arrio.Writer = (*FlightDataWriter)(nil)
+)