You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2023/06/15 01:09:01 UTC

[spark-connect-go] branch master updated: [SPARK-43351] Add DataFrame writer and reader prototype code

This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-connect-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ddc6a5  [SPARK-43351] Add DataFrame writer and reader prototype code
3ddc6a5 is described below

commit 3ddc6a5fd71e86d3ee558e92830e8ea47ea41b55
Author: hiboyang <14...@users.noreply.github.com>
AuthorDate: Thu Jun 15 09:08:41 2023 +0800

    [SPARK-43351] Add DataFrame writer and reader prototype code
    
    ### What changes were proposed in this pull request?
    
    Add dataframe writer and reader for Spark Connect Go client.
    
    JIRA: https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-43351
    
    ### Why are the changes needed?
    
    This is to add more implementation in Spark Connect Go client prototype, supporting dataframe writer and reader.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, user will be able to use Spark Connect Go client to write and read dataframe, like following:
    ```
            df.Write().Mode("overwrite").
                    Format("parquet").
                    Save("file:///tmp/spark-connect-write-example-output.parquet")
    
            df, _ = spark.Read().Format("parquet").
                    Load("file:///tmp/spark-connect-write-example-output.parquet")
    ```
    
    ### How was this patch tested?
    
    Tested by manually run `cmd/spark-connect-example-spark-session/main.go`
    
    Closes #10 from hiboyang/bo-dev-02.
    
    Lead-authored-by: hiboyang <14...@users.noreply.github.com>
    Co-authored-by: Bo Yang <14...@users.noreply.github.com>
    Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
 client/sql/dataframe.go                         | 15 +++++
 client/sql/dataframereader.go                   | 47 ++++++++++++++
 client/sql/dataframewriter.go                   | 84 +++++++++++++++++++++++++
 client/sql/dataframewriter_test.go              | 33 ++++++++++
 client/sql/sparksession.go                      | 27 ++++++++
 cmd/spark-connect-example-spark-session/main.go | 15 +++++
 6 files changed, 221 insertions(+)

diff --git a/client/sql/dataframe.go b/client/sql/dataframe.go
index 64e7646..eb1718a 100644
--- a/client/sql/dataframe.go
+++ b/client/sql/dataframe.go
@@ -27,12 +27,19 @@ import (
 	"io"
 )
 
+// DataFrame is a wrapper for data frame, representing a distributed collection of data row.
 type DataFrame interface {
+	// Show prints out data frame data.
 	Show(numRows int, truncate bool) error
+	// Schema returns the schema for the current data frame.
 	Schema() (*StructType, error)
+	// Collect returns the data rows of the current data frame.
 	Collect() ([]Row, error)
+	// Write returns a data frame writer, which could be used to save data frame to supported storage.
+	Write() DataFrameWriter
 }
 
+// dataFrameImpl is an implementation of DataFrame interface.
 type dataFrameImpl struct {
 	sparkSession *sparkSessionImpl
 	relation     *proto.Relation // TODO change to proto.Plan?
@@ -142,6 +149,14 @@ func (df *dataFrameImpl) Collect() ([]Row, error) {
 	return allRows, nil
 }
 
+func (df *dataFrameImpl) Write() DataFrameWriter {
+	writer := dataFrameWriterImpl{
+		sparkSession: df.sparkSession,
+		relation:     df.relation,
+	}
+	return &writer
+}
+
 func (df *dataFrameImpl) createPlan() *proto.Plan {
 	return &proto.Plan{
 		OpType: &proto.Plan_Root{
diff --git a/client/sql/dataframereader.go b/client/sql/dataframereader.go
new file mode 100644
index 0000000..71c4e79
--- /dev/null
+++ b/client/sql/dataframereader.go
@@ -0,0 +1,47 @@
+package sql
+
+import proto "github.com/apache/spark-connect-go/v_3_4/internal/generated"
+
+// DataFrameReader supports reading data from storage and returning a data frame.
+// TODO needs to implement other methods like Option(), Schema(), and also "strong typed"
+// reading (e.g. Parquet(), Orc(), Csv(), etc.
+type DataFrameReader interface {
+	// Format specifies data format (data source type) for the underlying data, e.g. parquet.
+	Format(source string) DataFrameReader
+	// Load reads the underlying data and returns a data frame.
+	Load(path string) (DataFrame, error)
+}
+
+// dataFrameReaderImpl is an implementation of DataFrameReader interface.
+type dataFrameReaderImpl struct {
+	sparkSession *sparkSessionImpl
+	formatSource string
+}
+
+func (w *dataFrameReaderImpl) Format(source string) DataFrameReader {
+	w.formatSource = source
+	return w
+}
+
+func (w *dataFrameReaderImpl) Load(path string) (DataFrame, error) {
+	var format *string
+	if w.formatSource != "" {
+		format = &w.formatSource
+	}
+	df := &dataFrameImpl{
+		sparkSession: w.sparkSession,
+		relation: &proto.Relation{
+			RelType: &proto.Relation_Read{
+				Read: &proto.Read{
+					ReadType: &proto.Read_DataSource_{
+						DataSource: &proto.Read_DataSource{
+							Format: format,
+							Paths:  []string{path},
+						},
+					},
+				},
+			},
+		},
+	}
+	return df, nil
+}
diff --git a/client/sql/dataframewriter.go b/client/sql/dataframewriter.go
new file mode 100644
index 0000000..868ae39
--- /dev/null
+++ b/client/sql/dataframewriter.go
@@ -0,0 +1,84 @@
+package sql
+
+import (
+	"fmt"
+	proto "github.com/apache/spark-connect-go/v_3_4/internal/generated"
+	"strings"
+)
+
+// DataFrameWriter supports writing data frame to storage.
+type DataFrameWriter interface {
+	// Mode specifies saving mode for the data, e.g. Append, Overwrite, ErrorIfExists.
+	Mode(saveMode string) DataFrameWriter
+	// Format specifies data format (data source type) for the underlying data, e.g. parquet.
+	Format(source string) DataFrameWriter
+	// Save writes data frame to the given path.
+	Save(path string) error
+}
+
+// dataFrameWriterImpl is an implementation of DataFrameWriter interface.
+type dataFrameWriterImpl struct {
+	sparkSession *sparkSessionImpl
+	relation     *proto.Relation
+	saveMode     string
+	formatSource string
+}
+
+func (w *dataFrameWriterImpl) Mode(saveMode string) DataFrameWriter {
+	w.saveMode = saveMode
+	return w
+}
+
+func (w *dataFrameWriterImpl) Format(source string) DataFrameWriter {
+	w.formatSource = source
+	return w
+}
+
+func (w *dataFrameWriterImpl) Save(path string) error {
+	saveMode, err := getSaveMode(w.saveMode)
+	if err != nil {
+		return err
+	}
+	var source *string
+	if w.formatSource != "" {
+		source = &w.formatSource
+	}
+	plan := &proto.Plan{
+		OpType: &proto.Plan_Command{
+			Command: &proto.Command{
+				CommandType: &proto.Command_WriteOperation{
+					WriteOperation: &proto.WriteOperation{
+						Input:  w.relation,
+						Mode:   saveMode,
+						Source: source,
+						SaveType: &proto.WriteOperation_Path{
+							Path: path,
+						},
+					},
+				},
+			},
+		},
+	}
+	responseClient, err := w.sparkSession.executePlan(plan)
+	if err != nil {
+		return err
+	}
+
+	return consumeExecutePlanClient(responseClient)
+}
+
+func getSaveMode(mode string) (proto.WriteOperation_SaveMode, error) {
+	if mode == "" {
+		return proto.WriteOperation_SAVE_MODE_UNSPECIFIED, nil
+	} else if strings.EqualFold(mode, "Append") {
+		return proto.WriteOperation_SAVE_MODE_APPEND, nil
+	} else if strings.EqualFold(mode, "Overwrite") {
+		return proto.WriteOperation_SAVE_MODE_OVERWRITE, nil
+	} else if strings.EqualFold(mode, "ErrorIfExists") {
+		return proto.WriteOperation_SAVE_MODE_ERROR_IF_EXISTS, nil
+	} else if strings.EqualFold(mode, "Ignore") {
+		return proto.WriteOperation_SAVE_MODE_IGNORE, nil
+	} else {
+		return 0, fmt.Errorf("unsupported save mode: %s", mode)
+	}
+}
diff --git a/client/sql/dataframewriter_test.go b/client/sql/dataframewriter_test.go
new file mode 100644
index 0000000..6521d57
--- /dev/null
+++ b/client/sql/dataframewriter_test.go
@@ -0,0 +1,33 @@
+package sql
+
+import (
+	proto "github.com/apache/spark-connect-go/v_3_4/internal/generated"
+	"github.com/stretchr/testify/assert"
+	"testing"
+)
+
+func TestGetSaveMode(t *testing.T) {
+	mode, err := getSaveMode("")
+	assert.Nil(t, err)
+	assert.Equal(t, proto.WriteOperation_SAVE_MODE_UNSPECIFIED, mode)
+
+	mode, err = getSaveMode("append")
+	assert.Nil(t, err)
+	assert.Equal(t, proto.WriteOperation_SAVE_MODE_APPEND, mode)
+
+	mode, err = getSaveMode("Overwrite")
+	assert.Nil(t, err)
+	assert.Equal(t, proto.WriteOperation_SAVE_MODE_OVERWRITE, mode)
+
+	mode, err = getSaveMode("ErrorIfExists")
+	assert.Nil(t, err)
+	assert.Equal(t, proto.WriteOperation_SAVE_MODE_ERROR_IF_EXISTS, mode)
+
+	mode, err = getSaveMode("IGNORE")
+	assert.Nil(t, err)
+	assert.Equal(t, proto.WriteOperation_SAVE_MODE_IGNORE, mode)
+
+	mode, err = getSaveMode("XYZ")
+	assert.NotNil(t, err)
+	assert.Equal(t, proto.WriteOperation_SAVE_MODE_UNSPECIFIED, mode)
+}
diff --git a/client/sql/sparksession.go b/client/sql/sparksession.go
index 6a3ca8c..3fa05cd 100644
--- a/client/sql/sparksession.go
+++ b/client/sql/sparksession.go
@@ -18,17 +18,20 @@ package sql
 
 import (
 	"context"
+	"errors"
 	"fmt"
 
 	"github.com/apache/spark-connect-go/v_3_4/client/channel"
 	proto "github.com/apache/spark-connect-go/v_3_4/internal/generated"
 	"github.com/google/uuid"
 	"google.golang.org/grpc/metadata"
+	"io"
 )
 
 var SparkSession sparkSessionBuilderEntrypoint
 
 type sparkSession interface {
+	Read() DataFrameReader
 	Sql(query string) (DataFrame, error)
 	Stop() error
 }
@@ -79,6 +82,12 @@ type sparkSessionImpl struct {
 	metadata  metadata.MD
 }
 
+func (s *sparkSessionImpl) Read() DataFrameReader {
+	return &dataFrameReaderImpl{
+		sparkSession: s,
+	}
+}
+
 func (s *sparkSessionImpl) Sql(query string) (DataFrame, error) {
 	plan := &proto.Plan{
 		OpType: &proto.Plan_Command{
@@ -154,3 +163,21 @@ func (s *sparkSessionImpl) analyzePlan(plan *proto.Plan) (*proto.AnalyzePlanResp
 	}
 	return response, nil
 }
+
+// consumeExecutePlanClient reads through the returned GRPC stream from Spark Connect Driver. It will
+// discard the returned data if there is no error. This is necessary for handling GRPC response for
+// saving data frame, since such consuming will trigger Spark Connect Driver really saving data frame.
+// If we do not consume the returned GRPC stream, Spark Connect Driver will not really save data frame.
+func consumeExecutePlanClient(responseClient proto.SparkConnectService_ExecutePlanClient) error {
+	for {
+		_, err := responseClient.Recv()
+		if err != nil {
+			if errors.Is(err, io.EOF) {
+				return nil
+			} else {
+				return fmt.Errorf("failed to receive plan execution response: %w", err)
+			}
+		}
+	}
+	return nil
+}
diff --git a/cmd/spark-connect-example-spark-session/main.go b/cmd/spark-connect-example-spark-session/main.go
index 0ea031b..6c0b5db 100644
--- a/cmd/spark-connect-example-spark-session/main.go
+++ b/cmd/spark-connect-example-spark-session/main.go
@@ -72,4 +72,19 @@ func main() {
 	for _, row := range rows {
 		log.Printf("Row: %v", row)
 	}
+
+	err = df.Write().Mode("overwrite").
+		Format("parquet").
+		Save("file:///tmp/spark-connect-write-example-output.parquet")
+	if err != nil {
+		log.Fatalf("Failed: %s", err.Error())
+	}
+
+	df, err = spark.Read().Format("parquet").
+		Load("file:///tmp/spark-connect-write-example-output.parquet")
+	if err != nil {
+		log.Fatalf("Failed: %s", err.Error())
+	}
+
+	df.Show(100, false)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org