You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/06/27 16:46:26 UTC
[spark-connect-go] branch master updated: [SPARK-43351] Support more data types when reading from spark connect arrow dataset to data frame; Also implement CreateTempView
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-connect-go.git
The following commit(s) were added to refs/heads/master by this push:
new e9001d2 [SPARK-43351] Support more data types when reading from spark connect arrow dataset to data frame; Also implement CreateTempView
e9001d2 is described below
commit e9001d2edbc2dd9ba83b7e721d79103bbc3bc598
Author: hiboyang <14...@users.noreply.github.com>
AuthorDate: Tue Jun 27 09:46:20 2023 -0700
[SPARK-43351] Support more data types when reading from spark connect arrow dataset to data frame; Also implement CreateTempView
### What changes were proposed in this pull request?
Support more data types when reading from spark connect arrow dataset to data frame; Also implement CreateTempView
### Why are the changes needed?
Support more data types when reading from spark connect arrow dataset to data frame; Also implement CreateTempView
### Does this PR introduce _any_ user-facing change?
Yes, able to create temp view, e.g.
```
dataframe.CreateTempView(...)
```
### How was this patch tested?
Unit test, and also manual test by running example code
Closes #11 from hiboyang/bo-dev-03.
Authored-by: hiboyang <14...@users.noreply.github.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
client/sql/dataframe.go | 184 ++++++++++++++---
client/sql/dataframe_test.go | 249 ++++++++++++++++++++++++
client/sql/datatype.go | 77 ++++++++
cmd/spark-connect-example-spark-session/main.go | 15 ++
4 files changed, 497 insertions(+), 28 deletions(-)
diff --git a/client/sql/dataframe.go b/client/sql/dataframe.go
index eb1718a..f2a0747 100644
--- a/client/sql/dataframe.go
+++ b/client/sql/dataframe.go
@@ -37,6 +37,8 @@ type DataFrame interface {
Collect() ([]Row, error)
// Write returns a data frame writer, which could be used to save data frame to supported storage.
Write() DataFrameWriter
+ // CreateTempView creates or replaces a temporary view.
+ CreateTempView(viewName string, replace bool, global bool) error
}
// dataFrameImpl is an implementation of DataFrame interface.
@@ -157,6 +159,30 @@ func (df *dataFrameImpl) Write() DataFrameWriter {
return &writer
}
+func (df *dataFrameImpl) CreateTempView(viewName string, replace bool, global bool) error {
+ plan := &proto.Plan{
+ OpType: &proto.Plan_Command{
+ Command: &proto.Command{
+ CommandType: &proto.Command_CreateDataframeView{
+ CreateDataframeView: &proto.CreateDataFrameViewCommand{
+ Input: df.relation,
+ Name: viewName,
+ Replace: replace,
+ IsGlobal: global,
+ },
+ },
+ },
+ },
+ }
+
+ responseClient, err := df.sparkSession.executePlan(plan)
+ if err != nil {
+ return fmt.Errorf("failed to create temp view %s: %w", viewName, err)
+ }
+
+ return consumeExecutePlanClient(responseClient)
+}
+
func (df *dataFrameImpl) createPlan() *proto.Plan {
return &proto.Plan{
OpType: &proto.Plan_Root{
@@ -208,38 +234,16 @@ func readArrowBatchData(data []byte, schema *StructType) ([]Row, error) {
return nil, fmt.Errorf("failed to read arrow: %w", err)
}
}
- numColumns := len(arrowReader.Schema().Fields())
+
+ values, err := readArrowRecord(record)
+ if err != nil {
+ return nil, err
+ }
+
numRows := int(record.NumRows())
if rows == nil {
rows = make([]Row, 0, numRows)
}
- values := make([][]any, numRows)
- for i := range values {
- values[i] = make([]any, numColumns)
- }
- for columnIndex := 0; columnIndex < numColumns; columnIndex++ {
- columnData := record.Column(columnIndex).Data()
- dataTypeId := columnData.DataType().ID()
- switch dataTypeId {
- case arrow.STRING:
- vector := array.NewStringData(columnData)
- for rowIndex := 0; rowIndex < numRows; rowIndex++ {
- values[rowIndex][columnIndex] = vector.Value(rowIndex)
- }
- case arrow.INT32:
- vector := array.NewInt32Data(columnData)
- for rowIndex := 0; rowIndex < numRows; rowIndex++ {
- values[rowIndex][columnIndex] = vector.Value(rowIndex)
- }
- case arrow.INT64:
- vector := array.NewInt64Data(columnData)
- for rowIndex := 0; rowIndex < numRows; rowIndex++ {
- values[rowIndex][columnIndex] = vector.Value(rowIndex)
- }
- default:
- return nil, fmt.Errorf("unsupported arrow data type %s in column %d", dataTypeId.String(), columnIndex)
- }
- }
for _, v := range values {
row := &GenericRowWithSchema{
@@ -258,6 +262,107 @@ func readArrowBatchData(data []byte, schema *StructType) ([]Row, error) {
return rows, nil
}
+// readArrowRecordColumn reads all values from arrow record and return [][]any
+func readArrowRecord(record arrow.Record) ([][]any, error) {
+ numRows := record.NumRows()
+ numColumns := int(record.NumCols())
+
+ values := make([][]any, numRows)
+ for i := range values {
+ values[i] = make([]any, numColumns)
+ }
+
+ for columnIndex := 0; columnIndex < numColumns; columnIndex++ {
+ err := readArrowRecordColumn(record, columnIndex, values)
+ if err != nil {
+ return nil, err
+ }
+ }
+ return values, nil
+}
+
+// readArrowRecordColumn reads all values in a column and stores them in values
+func readArrowRecordColumn(record arrow.Record, columnIndex int, values [][]any) error {
+ numRows := int(record.NumRows())
+ columnData := record.Column(columnIndex).Data()
+ dataTypeId := columnData.DataType().ID()
+ switch dataTypeId {
+ case arrow.BOOL:
+ vector := array.NewBooleanData(columnData)
+ for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+ values[rowIndex][columnIndex] = vector.Value(rowIndex)
+ }
+ case arrow.INT8:
+ vector := array.NewInt8Data(columnData)
+ for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+ values[rowIndex][columnIndex] = vector.Value(rowIndex)
+ }
+ case arrow.INT16:
+ vector := array.NewInt16Data(columnData)
+ for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+ values[rowIndex][columnIndex] = vector.Value(rowIndex)
+ }
+ case arrow.INT32:
+ vector := array.NewInt32Data(columnData)
+ for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+ values[rowIndex][columnIndex] = vector.Value(rowIndex)
+ }
+ case arrow.INT64:
+ vector := array.NewInt64Data(columnData)
+ for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+ values[rowIndex][columnIndex] = vector.Value(rowIndex)
+ }
+ case arrow.FLOAT16:
+ vector := array.NewFloat16Data(columnData)
+ for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+ values[rowIndex][columnIndex] = vector.Value(rowIndex)
+ }
+ case arrow.FLOAT32:
+ vector := array.NewFloat32Data(columnData)
+ for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+ values[rowIndex][columnIndex] = vector.Value(rowIndex)
+ }
+ case arrow.FLOAT64:
+ vector := array.NewFloat64Data(columnData)
+ for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+ values[rowIndex][columnIndex] = vector.Value(rowIndex)
+ }
+ case arrow.DECIMAL | arrow.DECIMAL128:
+ vector := array.NewDecimal128Data(columnData)
+ for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+ values[rowIndex][columnIndex] = vector.Value(rowIndex)
+ }
+ case arrow.DECIMAL256:
+ vector := array.NewDecimal256Data(columnData)
+ for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+ values[rowIndex][columnIndex] = vector.Value(rowIndex)
+ }
+ case arrow.STRING:
+ vector := array.NewStringData(columnData)
+ for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+ values[rowIndex][columnIndex] = vector.Value(rowIndex)
+ }
+ case arrow.BINARY:
+ vector := array.NewBinaryData(columnData)
+ for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+ values[rowIndex][columnIndex] = vector.Value(rowIndex)
+ }
+ case arrow.TIMESTAMP:
+ vector := array.NewTimestampData(columnData)
+ for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+ values[rowIndex][columnIndex] = vector.Value(rowIndex)
+ }
+ case arrow.DATE64:
+ vector := array.NewDate64Data(columnData)
+ for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+ values[rowIndex][columnIndex] = vector.Value(rowIndex)
+ }
+ default:
+ return fmt.Errorf("unsupported arrow data type %s in column %d", dataTypeId.String(), columnIndex)
+ }
+ return nil
+}
+
func convertProtoDataTypeToStructType(input *proto.DataType) *StructType {
dataTypeStruct := input.GetStruct()
if dataTypeStruct == nil {
@@ -283,12 +388,35 @@ func convertProtoStructField(field *proto.DataType_StructField) StructField {
}
}
+// convertProtoDataTypeToDataType converts protobuf data type to Spark connect sql data type
func convertProtoDataTypeToDataType(input *proto.DataType) DataType {
switch v := input.GetKind().(type) {
+ case *proto.DataType_Boolean_:
+ return BooleanType{}
+ case *proto.DataType_Byte_:
+ return ByteType{}
+ case *proto.DataType_Short_:
+ return ShortType{}
case *proto.DataType_Integer_:
return IntegerType{}
+ case *proto.DataType_Long_:
+ return LongType{}
+ case *proto.DataType_Float_:
+ return FloatType{}
+ case *proto.DataType_Double_:
+ return DoubleType{}
+ case *proto.DataType_Decimal_:
+ return DecimalType{}
case *proto.DataType_String_:
return StringType{}
+ case *proto.DataType_Binary_:
+ return BinaryType{}
+ case *proto.DataType_Timestamp_:
+ return TimestampType{}
+ case *proto.DataType_TimestampNtz:
+ return TimestampNtzType{}
+ case *proto.DataType_Date_:
+ return DateType{}
default:
return UnsupportedType{
TypeInfo: v,
diff --git a/client/sql/dataframe_test.go b/client/sql/dataframe_test.go
index d8c3c80..7775b74 100644
--- a/client/sql/dataframe_test.go
+++ b/client/sql/dataframe_test.go
@@ -20,8 +20,12 @@ import (
"bytes"
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
+ "github.com/apache/arrow/go/v12/arrow/decimal128"
+ "github.com/apache/arrow/go/v12/arrow/decimal256"
+ "github.com/apache/arrow/go/v12/arrow/float16"
"github.com/apache/arrow/go/v12/arrow/ipc"
"github.com/apache/arrow/go/v12/arrow/memory"
+ proto "github.com/apache/spark-connect-go/v_3_4/internal/generated"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
@@ -55,3 +59,248 @@ func TestShowArrowBatchData(t *testing.T) {
err = showArrowBatchData(buf.Bytes())
assert.Nil(t, err)
}
+
+func TestReadArrowRecord(t *testing.T) {
+ arrowFields := []arrow.Field{
+ {
+ Name: "boolean_column",
+ Type: &arrow.BooleanType{},
+ },
+ {
+ Name: "int8_column",
+ Type: &arrow.Int8Type{},
+ },
+ {
+ Name: "int16_column",
+ Type: &arrow.Int16Type{},
+ },
+ {
+ Name: "int32_column",
+ Type: &arrow.Int32Type{},
+ },
+ {
+ Name: "int64_column",
+ Type: &arrow.Int64Type{},
+ },
+ {
+ Name: "float16_column",
+ Type: &arrow.Float16Type{},
+ },
+ {
+ Name: "float32_column",
+ Type: &arrow.Float32Type{},
+ },
+ {
+ Name: "float64_column",
+ Type: &arrow.Float64Type{},
+ },
+ {
+ Name: "decimal128_column",
+ Type: &arrow.Decimal128Type{},
+ },
+ {
+ Name: "decimal256_column",
+ Type: &arrow.Decimal256Type{},
+ },
+ {
+ Name: "string_column",
+ Type: &arrow.StringType{},
+ },
+ {
+ Name: "binary_column",
+ Type: &arrow.BinaryType{},
+ },
+ {
+ Name: "timestamp_column",
+ Type: &arrow.TimestampType{},
+ },
+ {
+ Name: "date64_column",
+ Type: &arrow.Date64Type{},
+ },
+ }
+ arrowSchema := arrow.NewSchema(arrowFields, nil)
+ var buf bytes.Buffer
+ arrowWriter := ipc.NewWriter(&buf, ipc.WithSchema(arrowSchema))
+ defer arrowWriter.Close()
+
+ alloc := memory.NewGoAllocator()
+ recordBuilder := array.NewRecordBuilder(alloc, arrowSchema)
+ defer recordBuilder.Release()
+
+ i := 0
+ recordBuilder.Field(i).(*array.BooleanBuilder).Append(false)
+ recordBuilder.Field(i).(*array.BooleanBuilder).Append(true)
+
+ i++
+ recordBuilder.Field(i).(*array.Int8Builder).Append(1)
+ recordBuilder.Field(i).(*array.Int8Builder).Append(2)
+
+ i++
+ recordBuilder.Field(i).(*array.Int16Builder).Append(10)
+ recordBuilder.Field(i).(*array.Int16Builder).Append(20)
+
+ i++
+ recordBuilder.Field(i).(*array.Int32Builder).Append(100)
+ recordBuilder.Field(i).(*array.Int32Builder).Append(200)
+
+ i++
+ recordBuilder.Field(i).(*array.Int64Builder).Append(1000)
+ recordBuilder.Field(i).(*array.Int64Builder).Append(2000)
+
+ i++
+ recordBuilder.Field(i).(*array.Float16Builder).Append(float16.New(10000.1))
+ recordBuilder.Field(i).(*array.Float16Builder).Append(float16.New(20000.1))
+
+ i++
+ recordBuilder.Field(i).(*array.Float32Builder).Append(100000.1)
+ recordBuilder.Field(i).(*array.Float32Builder).Append(200000.1)
+
+ i++
+ recordBuilder.Field(i).(*array.Float64Builder).Append(1000000.1)
+ recordBuilder.Field(i).(*array.Float64Builder).Append(2000000.1)
+
+ i++
+ recordBuilder.Field(i).(*array.Decimal128Builder).Append(decimal128.FromI64(10000000))
+ recordBuilder.Field(i).(*array.Decimal128Builder).Append(decimal128.FromI64(20000000))
+
+ i++
+ recordBuilder.Field(i).(*array.Decimal256Builder).Append(decimal256.FromI64(100000000))
+ recordBuilder.Field(i).(*array.Decimal256Builder).Append(decimal256.FromI64(200000000))
+
+ i++
+ recordBuilder.Field(i).(*array.StringBuilder).Append("str1")
+ recordBuilder.Field(i).(*array.StringBuilder).Append("str2")
+
+ i++
+ recordBuilder.Field(i).(*array.BinaryBuilder).Append([]byte("bytes1"))
+ recordBuilder.Field(i).(*array.BinaryBuilder).Append([]byte("bytes2"))
+
+ i++
+ recordBuilder.Field(i).(*array.TimestampBuilder).Append(arrow.Timestamp(1686981953115000))
+ recordBuilder.Field(i).(*array.TimestampBuilder).Append(arrow.Timestamp(1686981953116000))
+
+ i++
+ recordBuilder.Field(i).(*array.Date64Builder).Append(arrow.Date64(1686981953117000))
+ recordBuilder.Field(i).(*array.Date64Builder).Append(arrow.Date64(1686981953118000))
+
+ record := recordBuilder.NewRecord()
+ defer record.Release()
+
+ values, err := readArrowRecord(record)
+ require.Nil(t, err)
+ assert.Equal(t, 2, len(values))
+ assert.Equal(t, []any{
+ false, int8(1), int16(10), int32(100), int64(1000),
+ float16.New(10000.1), float32(100000.1), 1000000.1,
+ decimal128.FromI64(10000000), decimal256.FromI64(100000000),
+ "str1", []byte("bytes1"),
+ arrow.Timestamp(1686981953115000), arrow.Date64(1686981953117000)},
+ values[0])
+ assert.Equal(t, []any{
+ true, int8(2), int16(20), int32(200), int64(2000),
+ float16.New(20000.1), float32(200000.1), 2000000.1,
+ decimal128.FromI64(20000000), decimal256.FromI64(200000000),
+ "str2", []byte("bytes2"),
+ arrow.Timestamp(1686981953116000), arrow.Date64(1686981953118000)},
+ values[1])
+}
+
+func TestReadArrowRecord_UnsupportedType(t *testing.T) {
+ arrowFields := []arrow.Field{
+ {
+ Name: "unsupported_type_column",
+ Type: &arrow.MonthIntervalType{},
+ },
+ }
+ arrowSchema := arrow.NewSchema(arrowFields, nil)
+ var buf bytes.Buffer
+ arrowWriter := ipc.NewWriter(&buf, ipc.WithSchema(arrowSchema))
+ defer arrowWriter.Close()
+
+ alloc := memory.NewGoAllocator()
+ recordBuilder := array.NewRecordBuilder(alloc, arrowSchema)
+ defer recordBuilder.Release()
+
+ recordBuilder.Field(0).(*array.MonthIntervalBuilder).Append(1)
+
+ record := recordBuilder.NewRecord()
+ defer record.Release()
+
+ _, err := readArrowRecord(record)
+ require.NotNil(t, err)
+}
+
+func TestConvertProtoDataTypeToDataType(t *testing.T) {
+ booleanDataType := &proto.DataType{
+ Kind: &proto.DataType_Boolean_{},
+ }
+ assert.Equal(t, "Boolean", convertProtoDataTypeToDataType(booleanDataType).TypeName())
+
+ byteDataType := &proto.DataType{
+ Kind: &proto.DataType_Byte_{},
+ }
+ assert.Equal(t, "Byte", convertProtoDataTypeToDataType(byteDataType).TypeName())
+
+ shortDataType := &proto.DataType{
+ Kind: &proto.DataType_Short_{},
+ }
+ assert.Equal(t, "Short", convertProtoDataTypeToDataType(shortDataType).TypeName())
+
+ integerDataType := &proto.DataType{
+ Kind: &proto.DataType_Integer_{},
+ }
+ assert.Equal(t, "Integer", convertProtoDataTypeToDataType(integerDataType).TypeName())
+
+ longDataType := &proto.DataType{
+ Kind: &proto.DataType_Long_{},
+ }
+ assert.Equal(t, "Long", convertProtoDataTypeToDataType(longDataType).TypeName())
+
+ floatDataType := &proto.DataType{
+ Kind: &proto.DataType_Float_{},
+ }
+ assert.Equal(t, "Float", convertProtoDataTypeToDataType(floatDataType).TypeName())
+
+ doubleDataType := &proto.DataType{
+ Kind: &proto.DataType_Double_{},
+ }
+ assert.Equal(t, "Double", convertProtoDataTypeToDataType(doubleDataType).TypeName())
+
+ decimalDataType := &proto.DataType{
+ Kind: &proto.DataType_Decimal_{},
+ }
+ assert.Equal(t, "Decimal", convertProtoDataTypeToDataType(decimalDataType).TypeName())
+
+ stringDataType := &proto.DataType{
+ Kind: &proto.DataType_String_{},
+ }
+ assert.Equal(t, "String", convertProtoDataTypeToDataType(stringDataType).TypeName())
+
+ binaryDataType := &proto.DataType{
+ Kind: &proto.DataType_Binary_{},
+ }
+ assert.Equal(t, "Binary", convertProtoDataTypeToDataType(binaryDataType).TypeName())
+
+ timestampDataType := &proto.DataType{
+ Kind: &proto.DataType_Timestamp_{},
+ }
+ assert.Equal(t, "Timestamp", convertProtoDataTypeToDataType(timestampDataType).TypeName())
+
+ timestampNtzDataType := &proto.DataType{
+ Kind: &proto.DataType_TimestampNtz{},
+ }
+ assert.Equal(t, "TimestampNtz", convertProtoDataTypeToDataType(timestampNtzDataType).TypeName())
+
+ dateDataType := &proto.DataType{
+ Kind: &proto.DataType_Date_{},
+ }
+ assert.Equal(t, "Date", convertProtoDataTypeToDataType(dateDataType).TypeName())
+}
+
+func TestConvertProtoDataTypeToDataType_UnsupportedType(t *testing.T) {
+ unsupportedDataType := &proto.DataType{
+ Kind: &proto.DataType_YearMonthInterval_{},
+ }
+ assert.Equal(t, "Unsupported", convertProtoDataTypeToDataType(unsupportedDataType).TypeName())
+}
diff --git a/client/sql/datatype.go b/client/sql/datatype.go
index 05ab02e..e201114 100644
--- a/client/sql/datatype.go
+++ b/client/sql/datatype.go
@@ -25,6 +25,27 @@ type DataType interface {
TypeName() string
}
+type BooleanType struct {
+}
+
+func (t BooleanType) TypeName() string {
+ return getDataTypeName(t)
+}
+
+type ByteType struct {
+}
+
+func (t ByteType) TypeName() string {
+ return getDataTypeName(t)
+}
+
+type ShortType struct {
+}
+
+func (t ShortType) TypeName() string {
+ return getDataTypeName(t)
+}
+
type IntegerType struct {
}
@@ -32,6 +53,34 @@ func (t IntegerType) TypeName() string {
return getDataTypeName(t)
}
+type LongType struct {
+}
+
+func (t LongType) TypeName() string {
+ return getDataTypeName(t)
+}
+
+type FloatType struct {
+}
+
+func (t FloatType) TypeName() string {
+ return getDataTypeName(t)
+}
+
+type DoubleType struct {
+}
+
+func (t DoubleType) TypeName() string {
+ return getDataTypeName(t)
+}
+
+type DecimalType struct {
+}
+
+func (t DecimalType) TypeName() string {
+ return getDataTypeName(t)
+}
+
type StringType struct {
}
@@ -39,6 +88,34 @@ func (t StringType) TypeName() string {
return getDataTypeName(t)
}
+type BinaryType struct {
+}
+
+func (t BinaryType) TypeName() string {
+ return getDataTypeName(t)
+}
+
+type TimestampType struct {
+}
+
+func (t TimestampType) TypeName() string {
+ return getDataTypeName(t)
+}
+
+type TimestampNtzType struct {
+}
+
+func (t TimestampNtzType) TypeName() string {
+ return getDataTypeName(t)
+}
+
+type DateType struct {
+}
+
+func (t DateType) TypeName() string {
+ return getDataTypeName(t)
+}
+
type UnsupportedType struct {
TypeInfo any
}
diff --git a/cmd/spark-connect-example-spark-session/main.go b/cmd/spark-connect-example-spark-session/main.go
index 6c0b5db..c35bbeb 100644
--- a/cmd/spark-connect-example-spark-session/main.go
+++ b/cmd/spark-connect-example-spark-session/main.go
@@ -41,6 +41,7 @@ func main() {
log.Fatalf("Failed: %s", err.Error())
}
+ log.Printf("DataFrame from sql: select 'apple' as word, 123 as count union all select 'orange' as word, 456 as count")
err = df.Show(100, false)
if err != nil {
log.Fatalf("Failed: %s", err.Error())
@@ -86,5 +87,19 @@ func main() {
log.Fatalf("Failed: %s", err.Error())
}
+ log.Printf("DataFrame from reading parquet")
+ df.Show(100, false)
+
+ err = df.CreateTempView("view1", true, false)
+ if err != nil {
+ log.Fatalf("Failed: %s", err.Error())
+ }
+
+ df, err = spark.Sql("select count, word from view1 order by count")
+ if err != nil {
+ log.Fatalf("Failed: %s", err.Error())
+ }
+
+ log.Printf("DataFrame from sql: select count, word from view1 order by count")
df.Show(100, false)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org