You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/06/02 11:15:40 UTC

[spark-connect-go] branch master updated: [SPARK-43351] Add init Spark Connect Go files

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-connect-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e2d7a4  [SPARK-43351] Add init Spark Connect Go files
2e2d7a4 is described below

commit 2e2d7a48f4a9626034ca11c691bcfc042d42e953
Author: Bo Yang <bo...@apple.com>
AuthorDate: Fri Jun 2 20:15:35 2023 +0900

    [SPARK-43351] Add init Spark Connect Go files
    
    ### What changes were proposed in this pull request?
    
    This pull request is to add a small Spark Connect Go client example and prototype.
    
    JIRA: https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-43351
    
    ### Why are the changes needed?
    
    Spark Connect was released in Spark 3.4.0. There is no Go client yet. Better to have a Go client so Spark Connect could be used by Go programmer.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. User will be able to use Go to write Spark Connect application. A very simple example in Go looks like following:
    ```
    func main() {
            remote := "localhost:15002"
            spark, _ := sql.SparkSession.Builder.Remote(remote).Build()
            defer spark.Stop()
    
            df, _ := spark.Sql("select 'apple' as word, 123 as count union all select 'orange' as word, 456 as count")
            df.Show(100, false)
    }
    ```
    
    ### How was this patch tested?
    
    Manually tested by running the example Go code.
    
    Closes #6 from hiboyang/bo-dev-01.
    
    Lead-authored-by: Bo Yang <bo...@apple.com>
    Co-authored-by: Hyukjin Kwon <gu...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .gitignore                                        |   3 +
 README.md                                         |  68 ++++++
 client/sql/dataframe.go                           | 282 ++++++++++++++++++++++
 client/sql/dataframe_test.go                      |  57 +++++
 client/sql/datatype.go                            |  63 +++++
 client/sql/plan.go                                |  26 ++
 client/sql/row.go                                 |  35 +++
 client/sql/sparksession.go                        | 134 ++++++++++
 client/sql/structtype.go                          |  28 +++
 cmd/spark-connect-example-raw-grpc-client/main.go |  64 +++++
 cmd/spark-connect-example-spark-session/main.go   |  73 ++++++
 go.sum                                            |  31 +++
 12 files changed, 864 insertions(+)

diff --git a/.gitignore b/.gitignore
index 0bca2cf..f61e556 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,3 +21,6 @@ internal/generated.out
 
 # Ignore Coverage Files
 coverage*
+
+# Ignore IDE files
+.idea/
diff --git a/README.md b/README.md
index b2be4d3..4422b9a 100644
--- a/README.md
+++ b/README.md
@@ -21,6 +21,74 @@ git submodule init --depth 1
 make gen && make check && make test
 ```
 
+## Spark Connect Go Application Example
+
+A very simple example in Go looks like following:
+
+```
+func main() {
+	remote := "localhost:15002"
+	spark, _ := sql.SparkSession.Builder.Remote(remote).Build()
+	defer spark.Stop()
+
+	df, _ := spark.Sql("select 'apple' as word, 123 as count union all select 'orange' as word, 456 as count")
+	df.Show(100, false)
+}
+```
+
+## High Level Design
+
+Following [diagram](https://textik.com/#ac299c8f32c4c342) shows main code in current prototype:
+
+```
+    +-------------------+                                                                              
+    |                   |                                                                              
+    |   dataFrameImpl   |                                                                              
+    |                   |                                                                              
+    +-------------------+                                                                              
+              |                                                                                        
+              |                                                                                        
+              +                                                                                        
+    +-------------------+                                                                              
+    |                   |                                                                              
+    | sparkSessionImpl  |                                                                              
+    |                   |                                                                              
+    +-------------------+                                                                              
+              |                                                                                        
+              |                                                                                        
+              +                                                                                        
++---------------------------+               +----------------+                                         
+|                           |               |                |                                         
+| SparkConnectServiceClient |--------------+|  Spark Driver  |                                         
+|                           |               |                |                                         
++---------------------------+               +----------------+
+
+```
+
+`SparkConnectServiceClient` is GRPC client which talks to Spark Driver. `sparkSessionImpl` generates `dataFrameImpl`
+instances. `dataFrameImpl` uses the GRPC client in `sparkSessionImpl` to communicate with Spark Driver.
+
+We will mimic the logic in Spark Connect Scala implementation, and adopt Go common practices, e.g. returning `error` object for
+error handling.
+
+## How to Run Spark Connect Go Application
+
+1. Install Golang: https://go.dev/doc/install.
+
+2. Download Spark distribution (3.4.0+), unzip the folder.
+
+3. Start Spark Connect server by running command:
+
+```
+sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.4.0
+```
+
+4. In this repo, run Go application:
+
+```
+go run cmd/spark-connect-example-spark-session/main.go
+```
+
 ## Contributing
 
 Please review the [Contribution to Spark guide](https://spark.apache.org/contributing.html)
diff --git a/client/sql/dataframe.go b/client/sql/dataframe.go
new file mode 100644
index 0000000..64e7646
--- /dev/null
+++ b/client/sql/dataframe.go
@@ -0,0 +1,282 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+import (
+	"bytes"
+	"errors"
+	"fmt"
+	"github.com/apache/arrow/go/v12/arrow"
+	"github.com/apache/arrow/go/v12/arrow/array"
+	"github.com/apache/arrow/go/v12/arrow/ipc"
+	proto "github.com/apache/spark-connect-go/v_3_4/internal/generated"
+	"io"
+)
+
+type DataFrame interface {
+	Show(numRows int, truncate bool) error
+	Schema() (*StructType, error)
+	Collect() ([]Row, error)
+}
+
+type dataFrameImpl struct {
+	sparkSession *sparkSessionImpl
+	relation     *proto.Relation // TODO change to proto.Plan?
+}
+
+func (df *dataFrameImpl) Show(numRows int, truncate bool) error {
+	truncateValue := 0
+	if truncate {
+		truncateValue = 20
+	}
+	vertical := false
+
+	plan := &proto.Plan{
+		OpType: &proto.Plan_Root{
+			Root: &proto.Relation{
+				Common: &proto.RelationCommon{
+					PlanId: newPlanId(),
+				},
+				RelType: &proto.Relation_ShowString{
+					ShowString: &proto.ShowString{
+						Input:    df.relation,
+						NumRows:  int32(numRows),
+						Truncate: int32(truncateValue),
+						Vertical: vertical,
+					},
+				},
+			},
+		},
+	}
+
+	responseClient, err := df.sparkSession.executePlan(plan)
+	if err != nil {
+		return fmt.Errorf("failed to show dataframe: %w", err)
+	}
+
+	for {
+		response, err := responseClient.Recv()
+		if err != nil {
+			return fmt.Errorf("failed to receive show response: %w", err)
+		}
+		arrowBatch := response.GetArrowBatch()
+		if arrowBatch == nil {
+			continue
+		}
+		err = showArrowBatch(arrowBatch)
+		if err != nil {
+			return err
+		}
+		return nil
+	}
+
+	return fmt.Errorf("did not get arrow batch in response")
+}
+
+func (df *dataFrameImpl) Schema() (*StructType, error) {
+	response, err := df.sparkSession.analyzePlan(df.createPlan())
+	if err != nil {
+		return nil, fmt.Errorf("failed to analyze plan: %w", err)
+	}
+
+	responseSchema := response.GetSchema().Schema
+	result := convertProtoDataTypeToStructType(responseSchema)
+	return result, nil
+}
+
+func (df *dataFrameImpl) Collect() ([]Row, error) {
+	responseClient, err := df.sparkSession.executePlan(df.createPlan())
+	if err != nil {
+		return nil, fmt.Errorf("failed to execute plan: %w", err)
+	}
+
+	var schema *StructType
+	var allRows []Row
+
+	for {
+		response, err := responseClient.Recv()
+		if err != nil {
+			if errors.Is(err, io.EOF) {
+				return allRows, nil
+			} else {
+				return nil, fmt.Errorf("failed to receive plan execution response: %w", err)
+			}
+		}
+
+		dataType := response.GetSchema()
+		if dataType != nil {
+			schema = convertProtoDataTypeToStructType(dataType)
+			continue
+		}
+
+		arrowBatch := response.GetArrowBatch()
+		if arrowBatch == nil {
+			continue
+		}
+
+		rowBatch, err := readArrowBatchData(arrowBatch.Data, schema)
+		if err != nil {
+			return nil, err
+		}
+
+		if allRows == nil {
+			allRows = make([]Row, 0, len(rowBatch))
+		}
+		allRows = append(allRows, rowBatch...)
+	}
+
+	return allRows, nil
+}
+
+func (df *dataFrameImpl) createPlan() *proto.Plan {
+	return &proto.Plan{
+		OpType: &proto.Plan_Root{
+			Root: &proto.Relation{
+				Common: &proto.RelationCommon{
+					PlanId: newPlanId(),
+				},
+				RelType: df.relation.RelType,
+			},
+		},
+	}
+}
+
+func showArrowBatch(arrowBatch *proto.ExecutePlanResponse_ArrowBatch) error {
+	return showArrowBatchData(arrowBatch.Data)
+}
+
+func showArrowBatchData(data []byte) error {
+	rows, err := readArrowBatchData(data, nil)
+	if err != nil {
+		return err
+	}
+	for _, row := range rows {
+		values, err := row.Values()
+		if err != nil {
+			return fmt.Errorf("failed to get values in the row: %w", err)
+		}
+		fmt.Println(values...)
+	}
+	return nil
+}
+
+func readArrowBatchData(data []byte, schema *StructType) ([]Row, error) {
+	reader := bytes.NewReader(data)
+	arrowReader, err := ipc.NewReader(reader)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create arrow reader: %w", err)
+	}
+	defer arrowReader.Release()
+
+	var rows []Row
+
+	for {
+		record, err := arrowReader.Read()
+		if err != nil {
+			if errors.Is(err, io.EOF) {
+				return rows, nil
+			} else {
+				return nil, fmt.Errorf("failed to read arrow: %w", err)
+			}
+		}
+		numColumns := len(arrowReader.Schema().Fields())
+		numRows := int(record.NumRows())
+		if rows == nil {
+			rows = make([]Row, 0, numRows)
+		}
+		values := make([][]any, numRows)
+		for i := range values {
+			values[i] = make([]any, numColumns)
+		}
+		for columnIndex := 0; columnIndex < numColumns; columnIndex++ {
+			columnData := record.Column(columnIndex).Data()
+			dataTypeId := columnData.DataType().ID()
+			switch dataTypeId {
+			case arrow.STRING:
+				vector := array.NewStringData(columnData)
+				for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+					values[rowIndex][columnIndex] = vector.Value(rowIndex)
+				}
+			case arrow.INT32:
+				vector := array.NewInt32Data(columnData)
+				for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+					values[rowIndex][columnIndex] = vector.Value(rowIndex)
+				}
+			case arrow.INT64:
+				vector := array.NewInt64Data(columnData)
+				for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+					values[rowIndex][columnIndex] = vector.Value(rowIndex)
+				}
+			default:
+				return nil, fmt.Errorf("unsupported arrow data type %s in column %d", dataTypeId.String(), columnIndex)
+			}
+		}
+
+		for _, v := range values {
+			row := &GenericRowWithSchema{
+				schema: schema,
+				values: v,
+			}
+			rows = append(rows, row)
+		}
+
+		hasNext := arrowReader.Next()
+		if !hasNext {
+			break
+		}
+	}
+
+	return rows, nil
+}
+
+func convertProtoDataTypeToStructType(input *proto.DataType) *StructType {
+	dataTypeStruct := input.GetStruct()
+	if dataTypeStruct == nil {
+		panic("dataType.GetStruct() is nil")
+	}
+	return &StructType{
+		Fields: convertProtoStructFields(dataTypeStruct.Fields),
+	}
+}
+
+func convertProtoStructFields(input []*proto.DataType_StructField) []StructField {
+	result := make([]StructField, len(input))
+	for i, f := range input {
+		result[i] = convertProtoStructField(f)
+	}
+	return result
+}
+
+func convertProtoStructField(field *proto.DataType_StructField) StructField {
+	return StructField{
+		Name:     field.Name,
+		DataType: convertProtoDataTypeToDataType(field.DataType),
+	}
+}
+
+func convertProtoDataTypeToDataType(input *proto.DataType) DataType {
+	switch v := input.GetKind().(type) {
+	case *proto.DataType_Integer_:
+		return IntegerType{}
+	case *proto.DataType_String_:
+		return StringType{}
+	default:
+		return UnsupportedType{
+			TypeInfo: v,
+		}
+	}
+}
diff --git a/client/sql/dataframe_test.go b/client/sql/dataframe_test.go
new file mode 100644
index 0000000..d8c3c80
--- /dev/null
+++ b/client/sql/dataframe_test.go
@@ -0,0 +1,57 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+import (
+	"bytes"
+	"github.com/apache/arrow/go/v12/arrow"
+	"github.com/apache/arrow/go/v12/arrow/array"
+	"github.com/apache/arrow/go/v12/arrow/ipc"
+	"github.com/apache/arrow/go/v12/arrow/memory"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+	"testing"
+)
+
+func TestShowArrowBatchData(t *testing.T) {
+	arrowFields := []arrow.Field{
+		{
+			Name: "show_string",
+			Type: &arrow.StringType{},
+		},
+	}
+	arrowSchema := arrow.NewSchema(arrowFields, nil)
+	var buf bytes.Buffer
+	arrowWriter := ipc.NewWriter(&buf, ipc.WithSchema(arrowSchema))
+	defer arrowWriter.Close()
+
+	alloc := memory.NewGoAllocator()
+	recordBuilder := array.NewRecordBuilder(alloc, arrowSchema)
+	defer recordBuilder.Release()
+
+	recordBuilder.Field(0).(*array.StringBuilder).Append("str1a\nstr1b")
+	recordBuilder.Field(0).(*array.StringBuilder).Append("str2")
+
+	record := recordBuilder.NewRecord()
+	defer record.Release()
+
+	err := arrowWriter.Write(record)
+	require.Nil(t, err)
+
+	err = showArrowBatchData(buf.Bytes())
+	assert.Nil(t, err)
+}
diff --git a/client/sql/datatype.go b/client/sql/datatype.go
new file mode 100644
index 0000000..05ab02e
--- /dev/null
+++ b/client/sql/datatype.go
@@ -0,0 +1,63 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+import (
+	"reflect"
+	"strings"
+)
+
+type DataType interface {
+	TypeName() string
+}
+
+type IntegerType struct {
+}
+
+func (t IntegerType) TypeName() string {
+	return getDataTypeName(t)
+}
+
+type StringType struct {
+}
+
+func (t StringType) TypeName() string {
+	return getDataTypeName(t)
+}
+
+type UnsupportedType struct {
+	TypeInfo any
+}
+
+func (t UnsupportedType) TypeName() string {
+	return getDataTypeName(t)
+}
+
+func getDataTypeName(dataType DataType) string {
+	t := reflect.TypeOf(dataType)
+	if t == nil {
+		return "(nil)"
+	}
+	var name string
+	if t.Kind() == reflect.Ptr {
+		name = t.Elem().Name()
+	} else {
+		name = t.Name()
+	}
+	name = strings.TrimSuffix(name, "Type")
+	return name
+}
diff --git a/client/sql/plan.go b/client/sql/plan.go
new file mode 100644
index 0000000..66b9e05
--- /dev/null
+++ b/client/sql/plan.go
@@ -0,0 +1,26 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+import "sync/atomic"
+
+var atomicInt64 atomic.Int64
+
+func newPlanId() *int64 {
+	v := atomicInt64.Add(1)
+	return &v
+}
diff --git a/client/sql/row.go b/client/sql/row.go
new file mode 100644
index 0000000..3bee2ac
--- /dev/null
+++ b/client/sql/row.go
@@ -0,0 +1,35 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+type Row interface {
+	Schema() (*StructType, error)
+	Values() ([]any, error)
+}
+
+type GenericRowWithSchema struct {
+	values []any
+	schema *StructType
+}
+
+func (r *GenericRowWithSchema) Schema() (*StructType, error) {
+	return r.schema, nil
+}
+
+func (r *GenericRowWithSchema) Values() ([]any, error) {
+	return r.values, nil
+}
diff --git a/client/sql/sparksession.go b/client/sql/sparksession.go
new file mode 100644
index 0000000..86f4cd1
--- /dev/null
+++ b/client/sql/sparksession.go
@@ -0,0 +1,134 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+import (
+	"context"
+	"fmt"
+	proto "github.com/apache/spark-connect-go/v_3_4/internal/generated"
+	"github.com/google/uuid"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/credentials/insecure"
+)
+
+var SparkSession sparkSessionBuilderEntrypoint
+
+type sparkSession interface {
+	Sql(query string) (DataFrame, error)
+	Stop() error
+}
+
+type sparkSessionBuilderEntrypoint struct {
+	Builder SparkSessionBuilder
+}
+
+type SparkSessionBuilder struct {
+	connectionString string
+}
+
+func (s SparkSessionBuilder) Remote(connectionString string) SparkSessionBuilder {
+	copy := s
+	copy.connectionString = connectionString
+	return copy
+}
+
+func (s SparkSessionBuilder) Build() (sparkSession, error) {
+	opts := []grpc.DialOption{
+		grpc.WithTransportCredentials(insecure.NewCredentials()),
+	}
+
+	conn, err := grpc.Dial(s.connectionString, opts...)
+	if err != nil {
+		return nil, fmt.Errorf("failed to connect to remote %s: %w", s.connectionString, err)
+	}
+
+	client := proto.NewSparkConnectServiceClient(conn)
+	return &sparkSessionImpl{
+		sessionId: uuid.NewString(),
+		client:    client,
+	}, nil
+}
+
+type sparkSessionImpl struct {
+	sessionId string
+	client    proto.SparkConnectServiceClient
+}
+
+func (s *sparkSessionImpl) Sql(query string) (DataFrame, error) {
+	plan := &proto.Plan{
+		OpType: &proto.Plan_Command{
+			Command: &proto.Command{
+				CommandType: &proto.Command_SqlCommand{
+					SqlCommand: &proto.SqlCommand{
+						Sql: query,
+					},
+				},
+			},
+		},
+	}
+	responseClient, err := s.executePlan(plan)
+	if err != nil {
+		return nil, fmt.Errorf("failed to execute sql: %s: %w", query, err)
+	}
+	for {
+		response, err := responseClient.Recv()
+		if err != nil {
+			return nil, fmt.Errorf("failed to receive ExecutePlan response: %w", err)
+		}
+		sqlCommandResult := response.GetSqlCommandResult()
+		if sqlCommandResult == nil {
+			continue
+		}
+		return &dataFrameImpl{
+			sparkSession: s,
+			relation:     sqlCommandResult.GetRelation(),
+		}, nil
+	}
+	return nil, fmt.Errorf("failed to get SqlCommandResult in ExecutePlan response")
+}
+
+func (s *sparkSessionImpl) Stop() error {
+	return nil
+}
+
+func (s *sparkSessionImpl) executePlan(plan *proto.Plan) (proto.SparkConnectService_ExecutePlanClient, error) {
+	request := proto.ExecutePlanRequest{
+		SessionId: s.sessionId,
+		Plan:      plan,
+	}
+	executePlanClient, err := s.client.ExecutePlan(context.TODO(), &request)
+	if err != nil {
+		return nil, fmt.Errorf("failed to call ExecutePlan in session %s: %w", s.sessionId, err)
+	}
+	return executePlanClient, nil
+}
+
+func (s *sparkSessionImpl) analyzePlan(plan *proto.Plan) (*proto.AnalyzePlanResponse, error) {
+	request := proto.AnalyzePlanRequest{
+		SessionId: s.sessionId,
+		Analyze: &proto.AnalyzePlanRequest_Schema_{
+			Schema: &proto.AnalyzePlanRequest_Schema{
+				Plan: plan,
+			},
+		},
+	}
+	response, err := s.client.AnalyzePlan(context.TODO(), &request)
+	if err != nil {
+		return nil, fmt.Errorf("failed to call AnalyzePlan in session %s: %w", s.sessionId, err)
+	}
+	return response, nil
+}
diff --git a/client/sql/structtype.go b/client/sql/structtype.go
new file mode 100644
index 0000000..2a59c30
--- /dev/null
+++ b/client/sql/structtype.go
@@ -0,0 +1,28 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+type StructField struct {
+	Name     string
+	DataType DataType
+	Nullable bool // default should be true
+}
+
+type StructType struct {
+	TypeName string
+	Fields   []StructField
+}
diff --git a/cmd/spark-connect-example-raw-grpc-client/main.go b/cmd/spark-connect-example-raw-grpc-client/main.go
new file mode 100644
index 0000000..8398292
--- /dev/null
+++ b/cmd/spark-connect-example-raw-grpc-client/main.go
@@ -0,0 +1,64 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"context"
+	"flag"
+	proto "github.com/apache/spark-connect-go/v_3_4/internal/generated"
+	"github.com/google/uuid"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/credentials/insecure"
+	"log"
+	"time"
+)
+
+var (
+	remote = flag.String("remote", "localhost:15002", "the remote address of Spark Connect server to connect to")
+)
+
+func main() {
+	opts := []grpc.DialOption{
+		grpc.WithTransportCredentials(insecure.NewCredentials()),
+	}
+
+	conn, err := grpc.Dial(*remote, opts...)
+	if err != nil {
+		log.Fatalf("Failed: %s", err.Error())
+	}
+	defer conn.Close()
+
+	client := proto.NewSparkConnectServiceClient(conn)
+
+	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+	defer cancel()
+
+	configRequest := proto.ConfigRequest{
+		SessionId: uuid.NewString(),
+		Operation: &proto.ConfigRequest_Operation{
+			OpType: &proto.ConfigRequest_Operation_GetAll{
+				GetAll: &proto.ConfigRequest_GetAll{},
+			},
+		},
+	}
+	configResponse, err := client.Config(ctx, &configRequest)
+	if err != nil {
+		log.Fatalf("Failed: %s", err.Error())
+	}
+
+	log.Printf("configResponse: %v", configResponse)
+}
diff --git a/cmd/spark-connect-example-spark-session/main.go b/cmd/spark-connect-example-spark-session/main.go
new file mode 100644
index 0000000..0f4a6cc
--- /dev/null
+++ b/cmd/spark-connect-example-spark-session/main.go
@@ -0,0 +1,73 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"flag"
+	"github.com/apache/spark-connect-go/v_3_4/client/sql"
+	"log"
+)
+
+var (
+	remote = flag.String("remote", "localhost:15002",
+		"the remote address of Spark Connect server to connect to")
+)
+
+func main() {
+	spark, err := sql.SparkSession.Builder.Remote(*remote).Build()
+	if err != nil {
+		log.Fatalf("Failed: %s", err.Error())
+	}
+	defer spark.Stop()
+
+	df, err := spark.Sql("select 'apple' as word, 123 as count union all select 'orange' as word, 456 as count")
+	if err != nil {
+		log.Fatalf("Failed: %s", err.Error())
+	}
+
+	err = df.Show(100, false)
+	if err != nil {
+		log.Fatalf("Failed: %s", err.Error())
+	}
+
+	schema, err := df.Schema()
+	if err != nil {
+		log.Fatalf("Failed: %s", err.Error())
+	}
+
+	for _, f := range schema.Fields {
+		log.Printf("Field in dataframe schema: %s - %s", f.Name, f.DataType.TypeName())
+	}
+
+	rows, err := df.Collect()
+	if err != nil {
+		log.Fatalf("Failed: %s", err.Error())
+	}
+
+	schema, err = rows[0].Schema()
+	if err != nil {
+		log.Fatalf("Failed: %s", err.Error())
+	}
+
+	for _, f := range schema.Fields {
+		log.Printf("Field in row: %s - %s", f.Name, f.DataType.TypeName())
+	}
+
+	for _, row := range rows {
+		log.Printf("Row: %v", row)
+	}
+}
diff --git a/go.sum b/go.sum
index 7fb61d6..f8c434e 100644
--- a/go.sum
+++ b/go.sum
@@ -1,40 +1,66 @@
+github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU=
+github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY=
 github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
+github.com/apache/arrow/go/v12 v12.0.0 h1:xtZE63VWl7qLdB0JObIXvvhGjoVNrQ9ciIHG2OK5cmc=
 github.com/apache/arrow/go/v12 v12.0.0/go.mod h1:d+tV/eHZZ7Dz7RPrFKtPK02tpr+c9/PEd/zm8mDS9Vg=
+github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY=
 github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
 github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/goccy/go-json v0.9.11 h1:/pAaQDLHEoCq/5FFmSKBswWmK6H0e8g4159Kc/X/nqk=
 github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
 github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8=
 github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
 github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
 github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
+github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
 github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/google/flatbuffers v2.0.8+incompatible h1:ivUb1cGomAB101ZM1T0nOiWz9pSrTMoa9+EiY7igmkM=
 github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
 github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
+github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
 github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4=
 github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
+github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
 github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
+github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
 github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
+github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
+github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
 github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
 github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
+github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=
 github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
+github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
 github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
 github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
 github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
+github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
 github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 h1:tnebWN09GYg9OLPss1KXj8txwZc6X6uMr6VFdcGNbHw=
 golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8=
 golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
 golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
 golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
@@ -43,11 +69,14 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
 golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
 golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM=
 golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f h1:uF6paiQQebLeSXkrTqHqz0MXhXXS1KgF41eUdBNvxK0=
 golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
+gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E=
 google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f h1:BWUVssLB0HVOSY78gIdvk1dTVYtT1y8SBWtPYuTJ/6w=
 google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM=
 google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag=
@@ -57,5 +86,7 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
 google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
 google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org