You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/06/02 11:15:40 UTC
[spark-connect-go] branch master updated: [SPARK-43351] Add init Spark Connect Go files
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-connect-go.git
The following commit(s) were added to refs/heads/master by this push:
new 2e2d7a4 [SPARK-43351] Add init Spark Connect Go files
2e2d7a4 is described below
commit 2e2d7a48f4a9626034ca11c691bcfc042d42e953
Author: Bo Yang <bo...@apple.com>
AuthorDate: Fri Jun 2 20:15:35 2023 +0900
[SPARK-43351] Add init Spark Connect Go files
### What changes were proposed in this pull request?
This pull request is to add a small Spark Connect Go client example and prototype.
JIRA: https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-43351
### Why are the changes needed?
Spark Connect was released in Spark 3.4.0. There is no Go client yet. Better to have a Go client so Spark Connect could be used by Go programmer.
### Does this PR introduce _any_ user-facing change?
Yes. User will be able to use Go to write Spark Connect application. A very simple example in Go looks like following:
```
func main() {
remote := "localhost:15002"
spark, _ := sql.SparkSession.Builder.Remote(remote).Build()
defer spark.Stop()
df, _ := spark.Sql("select 'apple' as word, 123 as count union all select 'orange' as word, 456 as count")
df.Show(100, false)
}
```
### How was this patch tested?
Manually tested by running the example Go code.
Closes #6 from hiboyang/bo-dev-01.
Lead-authored-by: Bo Yang <bo...@apple.com>
Co-authored-by: Hyukjin Kwon <gu...@gmail.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.gitignore | 3 +
README.md | 68 ++++++
client/sql/dataframe.go | 282 ++++++++++++++++++++++
client/sql/dataframe_test.go | 57 +++++
client/sql/datatype.go | 63 +++++
client/sql/plan.go | 26 ++
client/sql/row.go | 35 +++
client/sql/sparksession.go | 134 ++++++++++
client/sql/structtype.go | 28 +++
cmd/spark-connect-example-raw-grpc-client/main.go | 64 +++++
cmd/spark-connect-example-spark-session/main.go | 73 ++++++
go.sum | 31 +++
12 files changed, 864 insertions(+)
diff --git a/.gitignore b/.gitignore
index 0bca2cf..f61e556 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,3 +21,6 @@ internal/generated.out
# Ignore Coverage Files
coverage*
+
+# Ignore IDE files
+.idea/
diff --git a/README.md b/README.md
index b2be4d3..4422b9a 100644
--- a/README.md
+++ b/README.md
@@ -21,6 +21,74 @@ git submodule init --depth 1
make gen && make check && make test
```
+## Spark Connect Go Application Example
+
+A very simple example in Go looks like following:
+
+```
+func main() {
+ remote := "localhost:15002"
+ spark, _ := sql.SparkSession.Builder.Remote(remote).Build()
+ defer spark.Stop()
+
+ df, _ := spark.Sql("select 'apple' as word, 123 as count union all select 'orange' as word, 456 as count")
+ df.Show(100, false)
+}
+```
+
+## High Level Design
+
+Following [diagram](https://textik.com/#ac299c8f32c4c342) shows main code in current prototype:
+
+```
+ +-------------------+
+ | |
+ | dataFrameImpl |
+ | |
+ +-------------------+
+ |
+ |
+ +
+ +-------------------+
+ | |
+ | sparkSessionImpl |
+ | |
+ +-------------------+
+ |
+ |
+ +
++---------------------------+ +----------------+
+| | | |
+| SparkConnectServiceClient |--------------+| Spark Driver |
+| | | |
++---------------------------+ +----------------+
+
+```
+
+`SparkConnectServiceClient` is GRPC client which talks to Spark Driver. `sparkSessionImpl` generates `dataFrameImpl`
+instances. `dataFrameImpl` uses the GRPC client in `sparkSessionImpl` to communicate with Spark Driver.
+
+We will mimic the logic in Spark Connect Scala implementation, and adopt Go common practices, e.g. returning `error` object for
+error handling.
+
+## How to Run Spark Connect Go Application
+
+1. Install Golang: https://go.dev/doc/install.
+
+2. Download Spark distribution (3.4.0+), unzip the folder.
+
+3. Start Spark Connect server by running command:
+
+```
+sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.4.0
+```
+
+4. In this repo, run Go application:
+
+```
+go run cmd/spark-connect-example-spark-session/main.go
+```
+
## Contributing
Please review the [Contribution to Spark guide](https://spark.apache.org/contributing.html)
diff --git a/client/sql/dataframe.go b/client/sql/dataframe.go
new file mode 100644
index 0000000..64e7646
--- /dev/null
+++ b/client/sql/dataframe.go
@@ -0,0 +1,282 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "github.com/apache/arrow/go/v12/arrow"
+ "github.com/apache/arrow/go/v12/arrow/array"
+ "github.com/apache/arrow/go/v12/arrow/ipc"
+ proto "github.com/apache/spark-connect-go/v_3_4/internal/generated"
+ "io"
+)
+
+type DataFrame interface {
+ Show(numRows int, truncate bool) error
+ Schema() (*StructType, error)
+ Collect() ([]Row, error)
+}
+
+type dataFrameImpl struct {
+ sparkSession *sparkSessionImpl
+ relation *proto.Relation // TODO change to proto.Plan?
+}
+
+func (df *dataFrameImpl) Show(numRows int, truncate bool) error {
+ truncateValue := 0
+ if truncate {
+ truncateValue = 20
+ }
+ vertical := false
+
+ plan := &proto.Plan{
+ OpType: &proto.Plan_Root{
+ Root: &proto.Relation{
+ Common: &proto.RelationCommon{
+ PlanId: newPlanId(),
+ },
+ RelType: &proto.Relation_ShowString{
+ ShowString: &proto.ShowString{
+ Input: df.relation,
+ NumRows: int32(numRows),
+ Truncate: int32(truncateValue),
+ Vertical: vertical,
+ },
+ },
+ },
+ },
+ }
+
+ responseClient, err := df.sparkSession.executePlan(plan)
+ if err != nil {
+ return fmt.Errorf("failed to show dataframe: %w", err)
+ }
+
+ for {
+ response, err := responseClient.Recv()
+ if err != nil {
+ return fmt.Errorf("failed to receive show response: %w", err)
+ }
+ arrowBatch := response.GetArrowBatch()
+ if arrowBatch == nil {
+ continue
+ }
+ err = showArrowBatch(arrowBatch)
+ if err != nil {
+ return err
+ }
+ return nil
+ }
+
+ return fmt.Errorf("did not get arrow batch in response")
+}
+
+func (df *dataFrameImpl) Schema() (*StructType, error) {
+ response, err := df.sparkSession.analyzePlan(df.createPlan())
+ if err != nil {
+ return nil, fmt.Errorf("failed to analyze plan: %w", err)
+ }
+
+ responseSchema := response.GetSchema().Schema
+ result := convertProtoDataTypeToStructType(responseSchema)
+ return result, nil
+}
+
+func (df *dataFrameImpl) Collect() ([]Row, error) {
+ responseClient, err := df.sparkSession.executePlan(df.createPlan())
+ if err != nil {
+ return nil, fmt.Errorf("failed to execute plan: %w", err)
+ }
+
+ var schema *StructType
+ var allRows []Row
+
+ for {
+ response, err := responseClient.Recv()
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ return allRows, nil
+ } else {
+ return nil, fmt.Errorf("failed to receive plan execution response: %w", err)
+ }
+ }
+
+ dataType := response.GetSchema()
+ if dataType != nil {
+ schema = convertProtoDataTypeToStructType(dataType)
+ continue
+ }
+
+ arrowBatch := response.GetArrowBatch()
+ if arrowBatch == nil {
+ continue
+ }
+
+ rowBatch, err := readArrowBatchData(arrowBatch.Data, schema)
+ if err != nil {
+ return nil, err
+ }
+
+ if allRows == nil {
+ allRows = make([]Row, 0, len(rowBatch))
+ }
+ allRows = append(allRows, rowBatch...)
+ }
+
+ return allRows, nil
+}
+
+func (df *dataFrameImpl) createPlan() *proto.Plan {
+ return &proto.Plan{
+ OpType: &proto.Plan_Root{
+ Root: &proto.Relation{
+ Common: &proto.RelationCommon{
+ PlanId: newPlanId(),
+ },
+ RelType: df.relation.RelType,
+ },
+ },
+ }
+}
+
+func showArrowBatch(arrowBatch *proto.ExecutePlanResponse_ArrowBatch) error {
+ return showArrowBatchData(arrowBatch.Data)
+}
+
+func showArrowBatchData(data []byte) error {
+ rows, err := readArrowBatchData(data, nil)
+ if err != nil {
+ return err
+ }
+ for _, row := range rows {
+ values, err := row.Values()
+ if err != nil {
+ return fmt.Errorf("failed to get values in the row: %w", err)
+ }
+ fmt.Println(values...)
+ }
+ return nil
+}
+
+func readArrowBatchData(data []byte, schema *StructType) ([]Row, error) {
+ reader := bytes.NewReader(data)
+ arrowReader, err := ipc.NewReader(reader)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create arrow reader: %w", err)
+ }
+ defer arrowReader.Release()
+
+ var rows []Row
+
+ for {
+ record, err := arrowReader.Read()
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ return rows, nil
+ } else {
+ return nil, fmt.Errorf("failed to read arrow: %w", err)
+ }
+ }
+ numColumns := len(arrowReader.Schema().Fields())
+ numRows := int(record.NumRows())
+ if rows == nil {
+ rows = make([]Row, 0, numRows)
+ }
+ values := make([][]any, numRows)
+ for i := range values {
+ values[i] = make([]any, numColumns)
+ }
+ for columnIndex := 0; columnIndex < numColumns; columnIndex++ {
+ columnData := record.Column(columnIndex).Data()
+ dataTypeId := columnData.DataType().ID()
+ switch dataTypeId {
+ case arrow.STRING:
+ vector := array.NewStringData(columnData)
+ for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+ values[rowIndex][columnIndex] = vector.Value(rowIndex)
+ }
+ case arrow.INT32:
+ vector := array.NewInt32Data(columnData)
+ for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+ values[rowIndex][columnIndex] = vector.Value(rowIndex)
+ }
+ case arrow.INT64:
+ vector := array.NewInt64Data(columnData)
+ for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+ values[rowIndex][columnIndex] = vector.Value(rowIndex)
+ }
+ default:
+ return nil, fmt.Errorf("unsupported arrow data type %s in column %d", dataTypeId.String(), columnIndex)
+ }
+ }
+
+ for _, v := range values {
+ row := &GenericRowWithSchema{
+ schema: schema,
+ values: v,
+ }
+ rows = append(rows, row)
+ }
+
+ hasNext := arrowReader.Next()
+ if !hasNext {
+ break
+ }
+ }
+
+ return rows, nil
+}
+
+func convertProtoDataTypeToStructType(input *proto.DataType) *StructType {
+ dataTypeStruct := input.GetStruct()
+ if dataTypeStruct == nil {
+ panic("dataType.GetStruct() is nil")
+ }
+ return &StructType{
+ Fields: convertProtoStructFields(dataTypeStruct.Fields),
+ }
+}
+
+func convertProtoStructFields(input []*proto.DataType_StructField) []StructField {
+ result := make([]StructField, len(input))
+ for i, f := range input {
+ result[i] = convertProtoStructField(f)
+ }
+ return result
+}
+
+func convertProtoStructField(field *proto.DataType_StructField) StructField {
+ return StructField{
+ Name: field.Name,
+ DataType: convertProtoDataTypeToDataType(field.DataType),
+ }
+}
+
+func convertProtoDataTypeToDataType(input *proto.DataType) DataType {
+ switch v := input.GetKind().(type) {
+ case *proto.DataType_Integer_:
+ return IntegerType{}
+ case *proto.DataType_String_:
+ return StringType{}
+ default:
+ return UnsupportedType{
+ TypeInfo: v,
+ }
+ }
+}
diff --git a/client/sql/dataframe_test.go b/client/sql/dataframe_test.go
new file mode 100644
index 0000000..d8c3c80
--- /dev/null
+++ b/client/sql/dataframe_test.go
@@ -0,0 +1,57 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+import (
+ "bytes"
+ "github.com/apache/arrow/go/v12/arrow"
+ "github.com/apache/arrow/go/v12/arrow/array"
+ "github.com/apache/arrow/go/v12/arrow/ipc"
+ "github.com/apache/arrow/go/v12/arrow/memory"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "testing"
+)
+
+func TestShowArrowBatchData(t *testing.T) {
+ arrowFields := []arrow.Field{
+ {
+ Name: "show_string",
+ Type: &arrow.StringType{},
+ },
+ }
+ arrowSchema := arrow.NewSchema(arrowFields, nil)
+ var buf bytes.Buffer
+ arrowWriter := ipc.NewWriter(&buf, ipc.WithSchema(arrowSchema))
+ defer arrowWriter.Close()
+
+ alloc := memory.NewGoAllocator()
+ recordBuilder := array.NewRecordBuilder(alloc, arrowSchema)
+ defer recordBuilder.Release()
+
+ recordBuilder.Field(0).(*array.StringBuilder).Append("str1a\nstr1b")
+ recordBuilder.Field(0).(*array.StringBuilder).Append("str2")
+
+ record := recordBuilder.NewRecord()
+ defer record.Release()
+
+ err := arrowWriter.Write(record)
+ require.Nil(t, err)
+
+ err = showArrowBatchData(buf.Bytes())
+ assert.Nil(t, err)
+}
diff --git a/client/sql/datatype.go b/client/sql/datatype.go
new file mode 100644
index 0000000..05ab02e
--- /dev/null
+++ b/client/sql/datatype.go
@@ -0,0 +1,63 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+import (
+ "reflect"
+ "strings"
+)
+
+type DataType interface {
+ TypeName() string
+}
+
+type IntegerType struct {
+}
+
+func (t IntegerType) TypeName() string {
+ return getDataTypeName(t)
+}
+
+type StringType struct {
+}
+
+func (t StringType) TypeName() string {
+ return getDataTypeName(t)
+}
+
+type UnsupportedType struct {
+ TypeInfo any
+}
+
+func (t UnsupportedType) TypeName() string {
+ return getDataTypeName(t)
+}
+
+func getDataTypeName(dataType DataType) string {
+ t := reflect.TypeOf(dataType)
+ if t == nil {
+ return "(nil)"
+ }
+ var name string
+ if t.Kind() == reflect.Ptr {
+ name = t.Elem().Name()
+ } else {
+ name = t.Name()
+ }
+ name = strings.TrimSuffix(name, "Type")
+ return name
+}
diff --git a/client/sql/plan.go b/client/sql/plan.go
new file mode 100644
index 0000000..66b9e05
--- /dev/null
+++ b/client/sql/plan.go
@@ -0,0 +1,26 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+import "sync/atomic"
+
+var atomicInt64 atomic.Int64
+
+func newPlanId() *int64 {
+ v := atomicInt64.Add(1)
+ return &v
+}
diff --git a/client/sql/row.go b/client/sql/row.go
new file mode 100644
index 0000000..3bee2ac
--- /dev/null
+++ b/client/sql/row.go
@@ -0,0 +1,35 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+type Row interface {
+ Schema() (*StructType, error)
+ Values() ([]any, error)
+}
+
+type GenericRowWithSchema struct {
+ values []any
+ schema *StructType
+}
+
+func (r *GenericRowWithSchema) Schema() (*StructType, error) {
+ return r.schema, nil
+}
+
+func (r *GenericRowWithSchema) Values() ([]any, error) {
+ return r.values, nil
+}
diff --git a/client/sql/sparksession.go b/client/sql/sparksession.go
new file mode 100644
index 0000000..86f4cd1
--- /dev/null
+++ b/client/sql/sparksession.go
@@ -0,0 +1,134 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+import (
+ "context"
+ "fmt"
+ proto "github.com/apache/spark-connect-go/v_3_4/internal/generated"
+ "github.com/google/uuid"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+)
+
+var SparkSession sparkSessionBuilderEntrypoint
+
+type sparkSession interface {
+ Sql(query string) (DataFrame, error)
+ Stop() error
+}
+
+type sparkSessionBuilderEntrypoint struct {
+ Builder SparkSessionBuilder
+}
+
+type SparkSessionBuilder struct {
+ connectionString string
+}
+
+func (s SparkSessionBuilder) Remote(connectionString string) SparkSessionBuilder {
+ copy := s
+ copy.connectionString = connectionString
+ return copy
+}
+
+func (s SparkSessionBuilder) Build() (sparkSession, error) {
+ opts := []grpc.DialOption{
+ grpc.WithTransportCredentials(insecure.NewCredentials()),
+ }
+
+ conn, err := grpc.Dial(s.connectionString, opts...)
+ if err != nil {
+ return nil, fmt.Errorf("failed to connect to remote %s: %w", s.connectionString, err)
+ }
+
+ client := proto.NewSparkConnectServiceClient(conn)
+ return &sparkSessionImpl{
+ sessionId: uuid.NewString(),
+ client: client,
+ }, nil
+}
+
+type sparkSessionImpl struct {
+ sessionId string
+ client proto.SparkConnectServiceClient
+}
+
+func (s *sparkSessionImpl) Sql(query string) (DataFrame, error) {
+ plan := &proto.Plan{
+ OpType: &proto.Plan_Command{
+ Command: &proto.Command{
+ CommandType: &proto.Command_SqlCommand{
+ SqlCommand: &proto.SqlCommand{
+ Sql: query,
+ },
+ },
+ },
+ },
+ }
+ responseClient, err := s.executePlan(plan)
+ if err != nil {
+ return nil, fmt.Errorf("failed to execute sql: %s: %w", query, err)
+ }
+ for {
+ response, err := responseClient.Recv()
+ if err != nil {
+ return nil, fmt.Errorf("failed to receive ExecutePlan response: %w", err)
+ }
+ sqlCommandResult := response.GetSqlCommandResult()
+ if sqlCommandResult == nil {
+ continue
+ }
+ return &dataFrameImpl{
+ sparkSession: s,
+ relation: sqlCommandResult.GetRelation(),
+ }, nil
+ }
+ return nil, fmt.Errorf("failed to get SqlCommandResult in ExecutePlan response")
+}
+
+func (s *sparkSessionImpl) Stop() error {
+ return nil
+}
+
+func (s *sparkSessionImpl) executePlan(plan *proto.Plan) (proto.SparkConnectService_ExecutePlanClient, error) {
+ request := proto.ExecutePlanRequest{
+ SessionId: s.sessionId,
+ Plan: plan,
+ }
+ executePlanClient, err := s.client.ExecutePlan(context.TODO(), &request)
+ if err != nil {
+ return nil, fmt.Errorf("failed to call ExecutePlan in session %s: %w", s.sessionId, err)
+ }
+ return executePlanClient, nil
+}
+
+func (s *sparkSessionImpl) analyzePlan(plan *proto.Plan) (*proto.AnalyzePlanResponse, error) {
+ request := proto.AnalyzePlanRequest{
+ SessionId: s.sessionId,
+ Analyze: &proto.AnalyzePlanRequest_Schema_{
+ Schema: &proto.AnalyzePlanRequest_Schema{
+ Plan: plan,
+ },
+ },
+ }
+ response, err := s.client.AnalyzePlan(context.TODO(), &request)
+ if err != nil {
+ return nil, fmt.Errorf("failed to call AnalyzePlan in session %s: %w", s.sessionId, err)
+ }
+ return response, nil
+}
diff --git a/client/sql/structtype.go b/client/sql/structtype.go
new file mode 100644
index 0000000..2a59c30
--- /dev/null
+++ b/client/sql/structtype.go
@@ -0,0 +1,28 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+type StructField struct {
+ Name string
+ DataType DataType
+ Nullable bool // default should be true
+}
+
+type StructType struct {
+ TypeName string
+ Fields []StructField
+}
diff --git a/cmd/spark-connect-example-raw-grpc-client/main.go b/cmd/spark-connect-example-raw-grpc-client/main.go
new file mode 100644
index 0000000..8398292
--- /dev/null
+++ b/cmd/spark-connect-example-raw-grpc-client/main.go
@@ -0,0 +1,64 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "context"
+ "flag"
+ proto "github.com/apache/spark-connect-go/v_3_4/internal/generated"
+ "github.com/google/uuid"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+ "log"
+ "time"
+)
+
+var (
+ remote = flag.String("remote", "localhost:15002", "the remote address of Spark Connect server to connect to")
+)
+
+func main() {
+ opts := []grpc.DialOption{
+ grpc.WithTransportCredentials(insecure.NewCredentials()),
+ }
+
+ conn, err := grpc.Dial(*remote, opts...)
+ if err != nil {
+ log.Fatalf("Failed: %s", err.Error())
+ }
+ defer conn.Close()
+
+ client := proto.NewSparkConnectServiceClient(conn)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+
+ configRequest := proto.ConfigRequest{
+ SessionId: uuid.NewString(),
+ Operation: &proto.ConfigRequest_Operation{
+ OpType: &proto.ConfigRequest_Operation_GetAll{
+ GetAll: &proto.ConfigRequest_GetAll{},
+ },
+ },
+ }
+ configResponse, err := client.Config(ctx, &configRequest)
+ if err != nil {
+ log.Fatalf("Failed: %s", err.Error())
+ }
+
+ log.Printf("configResponse: %v", configResponse)
+}
diff --git a/cmd/spark-connect-example-spark-session/main.go b/cmd/spark-connect-example-spark-session/main.go
new file mode 100644
index 0000000..0f4a6cc
--- /dev/null
+++ b/cmd/spark-connect-example-spark-session/main.go
@@ -0,0 +1,73 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "flag"
+ "github.com/apache/spark-connect-go/v_3_4/client/sql"
+ "log"
+)
+
+var (
+ remote = flag.String("remote", "localhost:15002",
+ "the remote address of Spark Connect server to connect to")
+)
+
+func main() {
+ spark, err := sql.SparkSession.Builder.Remote(*remote).Build()
+ if err != nil {
+ log.Fatalf("Failed: %s", err.Error())
+ }
+ defer spark.Stop()
+
+ df, err := spark.Sql("select 'apple' as word, 123 as count union all select 'orange' as word, 456 as count")
+ if err != nil {
+ log.Fatalf("Failed: %s", err.Error())
+ }
+
+ err = df.Show(100, false)
+ if err != nil {
+ log.Fatalf("Failed: %s", err.Error())
+ }
+
+ schema, err := df.Schema()
+ if err != nil {
+ log.Fatalf("Failed: %s", err.Error())
+ }
+
+ for _, f := range schema.Fields {
+ log.Printf("Field in dataframe schema: %s - %s", f.Name, f.DataType.TypeName())
+ }
+
+ rows, err := df.Collect()
+ if err != nil {
+ log.Fatalf("Failed: %s", err.Error())
+ }
+
+ schema, err = rows[0].Schema()
+ if err != nil {
+ log.Fatalf("Failed: %s", err.Error())
+ }
+
+ for _, f := range schema.Fields {
+ log.Printf("Field in row: %s - %s", f.Name, f.DataType.TypeName())
+ }
+
+ for _, row := range rows {
+ log.Printf("Row: %v", row)
+ }
+}
diff --git a/go.sum b/go.sum
index 7fb61d6..f8c434e 100644
--- a/go.sum
+++ b/go.sum
@@ -1,40 +1,66 @@
+github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU=
+github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY=
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
+github.com/apache/arrow/go/v12 v12.0.0 h1:xtZE63VWl7qLdB0JObIXvvhGjoVNrQ9ciIHG2OK5cmc=
github.com/apache/arrow/go/v12 v12.0.0/go.mod h1:d+tV/eHZZ7Dz7RPrFKtPK02tpr+c9/PEd/zm8mDS9Vg=
+github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY=
github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/goccy/go-json v0.9.11 h1:/pAaQDLHEoCq/5FFmSKBswWmK6H0e8g4159Kc/X/nqk=
github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
+github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/google/flatbuffers v2.0.8+incompatible h1:ivUb1cGomAB101ZM1T0nOiWz9pSrTMoa9+EiY7igmkM=
github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
+github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4=
github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
+github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
+github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
+github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
+github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
+github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
+github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
+github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 h1:tnebWN09GYg9OLPss1KXj8txwZc6X6uMr6VFdcGNbHw=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
@@ -43,11 +69,14 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f h1:uF6paiQQebLeSXkrTqHqz0MXhXXS1KgF41eUdBNvxK0=
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
+gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E=
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f h1:BWUVssLB0HVOSY78gIdvk1dTVYtT1y8SBWtPYuTJ/6w=
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM=
google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag=
@@ -57,5 +86,7 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org