You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2023/06/15 01:09:01 UTC
[spark-connect-go] branch master updated: [SPARK-43351] Add DataFrame writer and reader prototype code
This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-connect-go.git
The following commit(s) were added to refs/heads/master by this push:
new 3ddc6a5 [SPARK-43351] Add DataFrame writer and reader prototype code
3ddc6a5 is described below
commit 3ddc6a5fd71e86d3ee558e92830e8ea47ea41b55
Author: hiboyang <14...@users.noreply.github.com>
AuthorDate: Thu Jun 15 09:08:41 2023 +0800
[SPARK-43351] Add DataFrame writer and reader prototype code
### What changes were proposed in this pull request?
Add dataframe writer and reader for Spark Connect Go client.
JIRA: https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-43351
### Why are the changes needed?
This is to add more implementation in Spark Connect Go client prototype, supporting dataframe writer and reader.
### Does this PR introduce _any_ user-facing change?
Yes, user will be able to use Spark Connect Go client to write and read dataframe, like following:
```
df.Write().Mode("overwrite").
Format("parquet").
Save("file:///tmp/spark-connect-write-example-output.parquet")
df, _ = spark.Read().Format("parquet").
Load("file:///tmp/spark-connect-write-example-output.parquet")
```
### How was this patch tested?
Tested by manually run `cmd/spark-connect-example-spark-session/main.go`
Closes #10 from hiboyang/bo-dev-02.
Lead-authored-by: hiboyang <14...@users.noreply.github.com>
Co-authored-by: Bo Yang <14...@users.noreply.github.com>
Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
client/sql/dataframe.go | 15 +++++
client/sql/dataframereader.go | 47 ++++++++++++++
client/sql/dataframewriter.go | 84 +++++++++++++++++++++++++
client/sql/dataframewriter_test.go | 33 ++++++++++
client/sql/sparksession.go | 27 ++++++++
cmd/spark-connect-example-spark-session/main.go | 15 +++++
6 files changed, 221 insertions(+)
diff --git a/client/sql/dataframe.go b/client/sql/dataframe.go
index 64e7646..eb1718a 100644
--- a/client/sql/dataframe.go
+++ b/client/sql/dataframe.go
@@ -27,12 +27,19 @@ import (
"io"
)
+// DataFrame is a wrapper for data frame, representing a distributed collection of data row.
type DataFrame interface {
+ // Show prints out data frame data.
Show(numRows int, truncate bool) error
+ // Schema returns the schema for the current data frame.
Schema() (*StructType, error)
+ // Collect returns the data rows of the current data frame.
Collect() ([]Row, error)
+ // Write returns a data frame writer, which could be used to save data frame to supported storage.
+ Write() DataFrameWriter
}
+// dataFrameImpl is an implementation of DataFrame interface.
type dataFrameImpl struct {
sparkSession *sparkSessionImpl
relation *proto.Relation // TODO change to proto.Plan?
@@ -142,6 +149,14 @@ func (df *dataFrameImpl) Collect() ([]Row, error) {
return allRows, nil
}
+func (df *dataFrameImpl) Write() DataFrameWriter {
+ writer := dataFrameWriterImpl{
+ sparkSession: df.sparkSession,
+ relation: df.relation,
+ }
+ return &writer
+}
+
func (df *dataFrameImpl) createPlan() *proto.Plan {
return &proto.Plan{
OpType: &proto.Plan_Root{
diff --git a/client/sql/dataframereader.go b/client/sql/dataframereader.go
new file mode 100644
index 0000000..71c4e79
--- /dev/null
+++ b/client/sql/dataframereader.go
@@ -0,0 +1,47 @@
+package sql
+
+import proto "github.com/apache/spark-connect-go/v_3_4/internal/generated"
+
+// DataFrameReader supports reading data from storage and returning a data frame.
+// TODO needs to implement other methods like Option(), Schema(), and also "strong typed"
+// reading (e.g. Parquet(), Orc(), Csv(), etc.
+type DataFrameReader interface {
+ // Format specifies data format (data source type) for the underlying data, e.g. parquet.
+ Format(source string) DataFrameReader
+ // Load reads the underlying data and returns a data frame.
+ Load(path string) (DataFrame, error)
+}
+
+// dataFrameReaderImpl is an implementation of DataFrameReader interface.
+type dataFrameReaderImpl struct {
+ sparkSession *sparkSessionImpl
+ formatSource string
+}
+
+func (w *dataFrameReaderImpl) Format(source string) DataFrameReader {
+ w.formatSource = source
+ return w
+}
+
+func (w *dataFrameReaderImpl) Load(path string) (DataFrame, error) {
+ var format *string
+ if w.formatSource != "" {
+ format = &w.formatSource
+ }
+ df := &dataFrameImpl{
+ sparkSession: w.sparkSession,
+ relation: &proto.Relation{
+ RelType: &proto.Relation_Read{
+ Read: &proto.Read{
+ ReadType: &proto.Read_DataSource_{
+ DataSource: &proto.Read_DataSource{
+ Format: format,
+ Paths: []string{path},
+ },
+ },
+ },
+ },
+ },
+ }
+ return df, nil
+}
diff --git a/client/sql/dataframewriter.go b/client/sql/dataframewriter.go
new file mode 100644
index 0000000..868ae39
--- /dev/null
+++ b/client/sql/dataframewriter.go
@@ -0,0 +1,84 @@
+package sql
+
+import (
+ "fmt"
+ proto "github.com/apache/spark-connect-go/v_3_4/internal/generated"
+ "strings"
+)
+
+// DataFrameWriter supports writing data frame to storage.
+type DataFrameWriter interface {
+ // Mode specifies saving mode for the data, e.g. Append, Overwrite, ErrorIfExists.
+ Mode(saveMode string) DataFrameWriter
+ // Format specifies data format (data source type) for the underlying data, e.g. parquet.
+ Format(source string) DataFrameWriter
+ // Save writes data frame to the given path.
+ Save(path string) error
+}
+
+// dataFrameWriterImpl is an implementation of DataFrameWriter interface.
+type dataFrameWriterImpl struct {
+ sparkSession *sparkSessionImpl
+ relation *proto.Relation
+ saveMode string
+ formatSource string
+}
+
+func (w *dataFrameWriterImpl) Mode(saveMode string) DataFrameWriter {
+ w.saveMode = saveMode
+ return w
+}
+
+func (w *dataFrameWriterImpl) Format(source string) DataFrameWriter {
+ w.formatSource = source
+ return w
+}
+
+func (w *dataFrameWriterImpl) Save(path string) error {
+ saveMode, err := getSaveMode(w.saveMode)
+ if err != nil {
+ return err
+ }
+ var source *string
+ if w.formatSource != "" {
+ source = &w.formatSource
+ }
+ plan := &proto.Plan{
+ OpType: &proto.Plan_Command{
+ Command: &proto.Command{
+ CommandType: &proto.Command_WriteOperation{
+ WriteOperation: &proto.WriteOperation{
+ Input: w.relation,
+ Mode: saveMode,
+ Source: source,
+ SaveType: &proto.WriteOperation_Path{
+ Path: path,
+ },
+ },
+ },
+ },
+ },
+ }
+ responseClient, err := w.sparkSession.executePlan(plan)
+ if err != nil {
+ return err
+ }
+
+ return consumeExecutePlanClient(responseClient)
+}
+
+func getSaveMode(mode string) (proto.WriteOperation_SaveMode, error) {
+ if mode == "" {
+ return proto.WriteOperation_SAVE_MODE_UNSPECIFIED, nil
+ } else if strings.EqualFold(mode, "Append") {
+ return proto.WriteOperation_SAVE_MODE_APPEND, nil
+ } else if strings.EqualFold(mode, "Overwrite") {
+ return proto.WriteOperation_SAVE_MODE_OVERWRITE, nil
+ } else if strings.EqualFold(mode, "ErrorIfExists") {
+ return proto.WriteOperation_SAVE_MODE_ERROR_IF_EXISTS, nil
+ } else if strings.EqualFold(mode, "Ignore") {
+ return proto.WriteOperation_SAVE_MODE_IGNORE, nil
+ } else {
+ return 0, fmt.Errorf("unsupported save mode: %s", mode)
+ }
+}
diff --git a/client/sql/dataframewriter_test.go b/client/sql/dataframewriter_test.go
new file mode 100644
index 0000000..6521d57
--- /dev/null
+++ b/client/sql/dataframewriter_test.go
@@ -0,0 +1,33 @@
+package sql
+
+import (
+ proto "github.com/apache/spark-connect-go/v_3_4/internal/generated"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestGetSaveMode(t *testing.T) {
+ mode, err := getSaveMode("")
+ assert.Nil(t, err)
+ assert.Equal(t, proto.WriteOperation_SAVE_MODE_UNSPECIFIED, mode)
+
+ mode, err = getSaveMode("append")
+ assert.Nil(t, err)
+ assert.Equal(t, proto.WriteOperation_SAVE_MODE_APPEND, mode)
+
+ mode, err = getSaveMode("Overwrite")
+ assert.Nil(t, err)
+ assert.Equal(t, proto.WriteOperation_SAVE_MODE_OVERWRITE, mode)
+
+ mode, err = getSaveMode("ErrorIfExists")
+ assert.Nil(t, err)
+ assert.Equal(t, proto.WriteOperation_SAVE_MODE_ERROR_IF_EXISTS, mode)
+
+ mode, err = getSaveMode("IGNORE")
+ assert.Nil(t, err)
+ assert.Equal(t, proto.WriteOperation_SAVE_MODE_IGNORE, mode)
+
+ mode, err = getSaveMode("XYZ")
+ assert.NotNil(t, err)
+ assert.Equal(t, proto.WriteOperation_SAVE_MODE_UNSPECIFIED, mode)
+}
diff --git a/client/sql/sparksession.go b/client/sql/sparksession.go
index 6a3ca8c..3fa05cd 100644
--- a/client/sql/sparksession.go
+++ b/client/sql/sparksession.go
@@ -18,17 +18,20 @@ package sql
import (
"context"
+ "errors"
"fmt"
"github.com/apache/spark-connect-go/v_3_4/client/channel"
proto "github.com/apache/spark-connect-go/v_3_4/internal/generated"
"github.com/google/uuid"
"google.golang.org/grpc/metadata"
+ "io"
)
var SparkSession sparkSessionBuilderEntrypoint
type sparkSession interface {
+ Read() DataFrameReader
Sql(query string) (DataFrame, error)
Stop() error
}
@@ -79,6 +82,12 @@ type sparkSessionImpl struct {
metadata metadata.MD
}
+func (s *sparkSessionImpl) Read() DataFrameReader {
+ return &dataFrameReaderImpl{
+ sparkSession: s,
+ }
+}
+
func (s *sparkSessionImpl) Sql(query string) (DataFrame, error) {
plan := &proto.Plan{
OpType: &proto.Plan_Command{
@@ -154,3 +163,21 @@ func (s *sparkSessionImpl) analyzePlan(plan *proto.Plan) (*proto.AnalyzePlanResp
}
return response, nil
}
+
+// consumeExecutePlanClient reads through the returned GRPC stream from Spark Connect Driver. It will
+// discard the returned data if there is no error. This is necessary for handling GRPC response for
+// saving data frame, since such consuming will trigger Spark Connect Driver really saving data frame.
+// If we do not consume the returned GRPC stream, Spark Connect Driver will not really save data frame.
+func consumeExecutePlanClient(responseClient proto.SparkConnectService_ExecutePlanClient) error {
+ for {
+ _, err := responseClient.Recv()
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ return nil
+ } else {
+ return fmt.Errorf("failed to receive plan execution response: %w", err)
+ }
+ }
+ }
+ return nil
+}
diff --git a/cmd/spark-connect-example-spark-session/main.go b/cmd/spark-connect-example-spark-session/main.go
index 0ea031b..6c0b5db 100644
--- a/cmd/spark-connect-example-spark-session/main.go
+++ b/cmd/spark-connect-example-spark-session/main.go
@@ -72,4 +72,19 @@ func main() {
for _, row := range rows {
log.Printf("Row: %v", row)
}
+
+ err = df.Write().Mode("overwrite").
+ Format("parquet").
+ Save("file:///tmp/spark-connect-write-example-output.parquet")
+ if err != nil {
+ log.Fatalf("Failed: %s", err.Error())
+ }
+
+ df, err = spark.Read().Format("parquet").
+ Load("file:///tmp/spark-connect-write-example-output.parquet")
+ if err != nil {
+ log.Fatalf("Failed: %s", err.Error())
+ }
+
+ df.Show(100, false)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org