You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/06/27 16:46:26 UTC

[spark-connect-go] branch master updated: [SPARK-43351] Support more data types when reading from spark connect arrow dataset to data frame; Also implement CreateTempView

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-connect-go.git


The following commit(s) were added to refs/heads/master by this push:
     new e9001d2  [SPARK-43351] Support more data types when reading from spark connect arrow dataset to data frame; Also implement CreateTempView
e9001d2 is described below

commit e9001d2edbc2dd9ba83b7e721d79103bbc3bc598
Author: hiboyang <14...@users.noreply.github.com>
AuthorDate: Tue Jun 27 09:46:20 2023 -0700

    [SPARK-43351] Support more data types when reading from spark connect arrow dataset to data frame; Also implement CreateTempView
    
    ### What changes were proposed in this pull request?
    
    Support more data types when reading from spark connect arrow dataset to data frame; Also implement CreateTempView
    
    ### Why are the changes needed?
    
    Support more data types when reading from spark connect arrow dataset to data frame; Also implement CreateTempView
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, able to create temp view, e.g.
    ```
    dataframe.CreateTempView(...)
    ```
    
    ### How was this patch tested?
    
    Unit test, and also manual test by running example code
    
    Closes #11 from hiboyang/bo-dev-03.
    
    Authored-by: hiboyang <14...@users.noreply.github.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 client/sql/dataframe.go                         | 184 ++++++++++++++---
 client/sql/dataframe_test.go                    | 249 ++++++++++++++++++++++++
 client/sql/datatype.go                          |  77 ++++++++
 cmd/spark-connect-example-spark-session/main.go |  15 ++
 4 files changed, 497 insertions(+), 28 deletions(-)

diff --git a/client/sql/dataframe.go b/client/sql/dataframe.go
index eb1718a..f2a0747 100644
--- a/client/sql/dataframe.go
+++ b/client/sql/dataframe.go
@@ -37,6 +37,8 @@ type DataFrame interface {
 	Collect() ([]Row, error)
 	// Write returns a data frame writer, which could be used to save data frame to supported storage.
 	Write() DataFrameWriter
+	// CreateTempView creates or replaces a temporary view.
+	CreateTempView(viewName string, replace bool, global bool) error
 }
 
 // dataFrameImpl is an implementation of DataFrame interface.
@@ -157,6 +159,30 @@ func (df *dataFrameImpl) Write() DataFrameWriter {
 	return &writer
 }
 
+func (df *dataFrameImpl) CreateTempView(viewName string, replace bool, global bool) error {
+	plan := &proto.Plan{
+		OpType: &proto.Plan_Command{
+			Command: &proto.Command{
+				CommandType: &proto.Command_CreateDataframeView{
+					CreateDataframeView: &proto.CreateDataFrameViewCommand{
+						Input:    df.relation,
+						Name:     viewName,
+						Replace:  replace,
+						IsGlobal: global,
+					},
+				},
+			},
+		},
+	}
+
+	responseClient, err := df.sparkSession.executePlan(plan)
+	if err != nil {
+		return fmt.Errorf("failed to create temp view %s: %w", viewName, err)
+	}
+
+	return consumeExecutePlanClient(responseClient)
+}
+
 func (df *dataFrameImpl) createPlan() *proto.Plan {
 	return &proto.Plan{
 		OpType: &proto.Plan_Root{
@@ -208,38 +234,16 @@ func readArrowBatchData(data []byte, schema *StructType) ([]Row, error) {
 				return nil, fmt.Errorf("failed to read arrow: %w", err)
 			}
 		}
-		numColumns := len(arrowReader.Schema().Fields())
+
+		values, err := readArrowRecord(record)
+		if err != nil {
+			return nil, err
+		}
+
 		numRows := int(record.NumRows())
 		if rows == nil {
 			rows = make([]Row, 0, numRows)
 		}
-		values := make([][]any, numRows)
-		for i := range values {
-			values[i] = make([]any, numColumns)
-		}
-		for columnIndex := 0; columnIndex < numColumns; columnIndex++ {
-			columnData := record.Column(columnIndex).Data()
-			dataTypeId := columnData.DataType().ID()
-			switch dataTypeId {
-			case arrow.STRING:
-				vector := array.NewStringData(columnData)
-				for rowIndex := 0; rowIndex < numRows; rowIndex++ {
-					values[rowIndex][columnIndex] = vector.Value(rowIndex)
-				}
-			case arrow.INT32:
-				vector := array.NewInt32Data(columnData)
-				for rowIndex := 0; rowIndex < numRows; rowIndex++ {
-					values[rowIndex][columnIndex] = vector.Value(rowIndex)
-				}
-			case arrow.INT64:
-				vector := array.NewInt64Data(columnData)
-				for rowIndex := 0; rowIndex < numRows; rowIndex++ {
-					values[rowIndex][columnIndex] = vector.Value(rowIndex)
-				}
-			default:
-				return nil, fmt.Errorf("unsupported arrow data type %s in column %d", dataTypeId.String(), columnIndex)
-			}
-		}
 
 		for _, v := range values {
 			row := &GenericRowWithSchema{
@@ -258,6 +262,107 @@ func readArrowBatchData(data []byte, schema *StructType) ([]Row, error) {
 	return rows, nil
 }
 
+// readArrowRecordColumn reads all values from arrow record and return [][]any
+func readArrowRecord(record arrow.Record) ([][]any, error) {
+	numRows := record.NumRows()
+	numColumns := int(record.NumCols())
+
+	values := make([][]any, numRows)
+	for i := range values {
+		values[i] = make([]any, numColumns)
+	}
+
+	for columnIndex := 0; columnIndex < numColumns; columnIndex++ {
+		err := readArrowRecordColumn(record, columnIndex, values)
+		if err != nil {
+			return nil, err
+		}
+	}
+	return values, nil
+}
+
+// readArrowRecordColumn reads all values in a column and stores them in values
+func readArrowRecordColumn(record arrow.Record, columnIndex int, values [][]any) error {
+	numRows := int(record.NumRows())
+	columnData := record.Column(columnIndex).Data()
+	dataTypeId := columnData.DataType().ID()
+	switch dataTypeId {
+	case arrow.BOOL:
+		vector := array.NewBooleanData(columnData)
+		for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+			values[rowIndex][columnIndex] = vector.Value(rowIndex)
+		}
+	case arrow.INT8:
+		vector := array.NewInt8Data(columnData)
+		for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+			values[rowIndex][columnIndex] = vector.Value(rowIndex)
+		}
+	case arrow.INT16:
+		vector := array.NewInt16Data(columnData)
+		for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+			values[rowIndex][columnIndex] = vector.Value(rowIndex)
+		}
+	case arrow.INT32:
+		vector := array.NewInt32Data(columnData)
+		for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+			values[rowIndex][columnIndex] = vector.Value(rowIndex)
+		}
+	case arrow.INT64:
+		vector := array.NewInt64Data(columnData)
+		for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+			values[rowIndex][columnIndex] = vector.Value(rowIndex)
+		}
+	case arrow.FLOAT16:
+		vector := array.NewFloat16Data(columnData)
+		for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+			values[rowIndex][columnIndex] = vector.Value(rowIndex)
+		}
+	case arrow.FLOAT32:
+		vector := array.NewFloat32Data(columnData)
+		for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+			values[rowIndex][columnIndex] = vector.Value(rowIndex)
+		}
+	case arrow.FLOAT64:
+		vector := array.NewFloat64Data(columnData)
+		for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+			values[rowIndex][columnIndex] = vector.Value(rowIndex)
+		}
+	case arrow.DECIMAL | arrow.DECIMAL128:
+		vector := array.NewDecimal128Data(columnData)
+		for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+			values[rowIndex][columnIndex] = vector.Value(rowIndex)
+		}
+	case arrow.DECIMAL256:
+		vector := array.NewDecimal256Data(columnData)
+		for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+			values[rowIndex][columnIndex] = vector.Value(rowIndex)
+		}
+	case arrow.STRING:
+		vector := array.NewStringData(columnData)
+		for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+			values[rowIndex][columnIndex] = vector.Value(rowIndex)
+		}
+	case arrow.BINARY:
+		vector := array.NewBinaryData(columnData)
+		for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+			values[rowIndex][columnIndex] = vector.Value(rowIndex)
+		}
+	case arrow.TIMESTAMP:
+		vector := array.NewTimestampData(columnData)
+		for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+			values[rowIndex][columnIndex] = vector.Value(rowIndex)
+		}
+	case arrow.DATE64:
+		vector := array.NewDate64Data(columnData)
+		for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+			values[rowIndex][columnIndex] = vector.Value(rowIndex)
+		}
+	default:
+		return fmt.Errorf("unsupported arrow data type %s in column %d", dataTypeId.String(), columnIndex)
+	}
+	return nil
+}
+
 func convertProtoDataTypeToStructType(input *proto.DataType) *StructType {
 	dataTypeStruct := input.GetStruct()
 	if dataTypeStruct == nil {
@@ -283,12 +388,35 @@ func convertProtoStructField(field *proto.DataType_StructField) StructField {
 	}
 }
 
+// convertProtoDataTypeToDataType converts protobuf data type to Spark connect sql data type
 func convertProtoDataTypeToDataType(input *proto.DataType) DataType {
 	switch v := input.GetKind().(type) {
+	case *proto.DataType_Boolean_:
+		return BooleanType{}
+	case *proto.DataType_Byte_:
+		return ByteType{}
+	case *proto.DataType_Short_:
+		return ShortType{}
 	case *proto.DataType_Integer_:
 		return IntegerType{}
+	case *proto.DataType_Long_:
+		return LongType{}
+	case *proto.DataType_Float_:
+		return FloatType{}
+	case *proto.DataType_Double_:
+		return DoubleType{}
+	case *proto.DataType_Decimal_:
+		return DecimalType{}
 	case *proto.DataType_String_:
 		return StringType{}
+	case *proto.DataType_Binary_:
+		return BinaryType{}
+	case *proto.DataType_Timestamp_:
+		return TimestampType{}
+	case *proto.DataType_TimestampNtz:
+		return TimestampNtzType{}
+	case *proto.DataType_Date_:
+		return DateType{}
 	default:
 		return UnsupportedType{
 			TypeInfo: v,
diff --git a/client/sql/dataframe_test.go b/client/sql/dataframe_test.go
index d8c3c80..7775b74 100644
--- a/client/sql/dataframe_test.go
+++ b/client/sql/dataframe_test.go
@@ -20,8 +20,12 @@ import (
 	"bytes"
 	"github.com/apache/arrow/go/v12/arrow"
 	"github.com/apache/arrow/go/v12/arrow/array"
+	"github.com/apache/arrow/go/v12/arrow/decimal128"
+	"github.com/apache/arrow/go/v12/arrow/decimal256"
+	"github.com/apache/arrow/go/v12/arrow/float16"
 	"github.com/apache/arrow/go/v12/arrow/ipc"
 	"github.com/apache/arrow/go/v12/arrow/memory"
+	proto "github.com/apache/spark-connect-go/v_3_4/internal/generated"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 	"testing"
@@ -55,3 +59,248 @@ func TestShowArrowBatchData(t *testing.T) {
 	err = showArrowBatchData(buf.Bytes())
 	assert.Nil(t, err)
 }
+
+func TestReadArrowRecord(t *testing.T) {
+	arrowFields := []arrow.Field{
+		{
+			Name: "boolean_column",
+			Type: &arrow.BooleanType{},
+		},
+		{
+			Name: "int8_column",
+			Type: &arrow.Int8Type{},
+		},
+		{
+			Name: "int16_column",
+			Type: &arrow.Int16Type{},
+		},
+		{
+			Name: "int32_column",
+			Type: &arrow.Int32Type{},
+		},
+		{
+			Name: "int64_column",
+			Type: &arrow.Int64Type{},
+		},
+		{
+			Name: "float16_column",
+			Type: &arrow.Float16Type{},
+		},
+		{
+			Name: "float32_column",
+			Type: &arrow.Float32Type{},
+		},
+		{
+			Name: "float64_column",
+			Type: &arrow.Float64Type{},
+		},
+		{
+			Name: "decimal128_column",
+			Type: &arrow.Decimal128Type{},
+		},
+		{
+			Name: "decimal256_column",
+			Type: &arrow.Decimal256Type{},
+		},
+		{
+			Name: "string_column",
+			Type: &arrow.StringType{},
+		},
+		{
+			Name: "binary_column",
+			Type: &arrow.BinaryType{},
+		},
+		{
+			Name: "timestamp_column",
+			Type: &arrow.TimestampType{},
+		},
+		{
+			Name: "date64_column",
+			Type: &arrow.Date64Type{},
+		},
+	}
+	arrowSchema := arrow.NewSchema(arrowFields, nil)
+	var buf bytes.Buffer
+	arrowWriter := ipc.NewWriter(&buf, ipc.WithSchema(arrowSchema))
+	defer arrowWriter.Close()
+
+	alloc := memory.NewGoAllocator()
+	recordBuilder := array.NewRecordBuilder(alloc, arrowSchema)
+	defer recordBuilder.Release()
+
+	i := 0
+	recordBuilder.Field(i).(*array.BooleanBuilder).Append(false)
+	recordBuilder.Field(i).(*array.BooleanBuilder).Append(true)
+
+	i++
+	recordBuilder.Field(i).(*array.Int8Builder).Append(1)
+	recordBuilder.Field(i).(*array.Int8Builder).Append(2)
+
+	i++
+	recordBuilder.Field(i).(*array.Int16Builder).Append(10)
+	recordBuilder.Field(i).(*array.Int16Builder).Append(20)
+
+	i++
+	recordBuilder.Field(i).(*array.Int32Builder).Append(100)
+	recordBuilder.Field(i).(*array.Int32Builder).Append(200)
+
+	i++
+	recordBuilder.Field(i).(*array.Int64Builder).Append(1000)
+	recordBuilder.Field(i).(*array.Int64Builder).Append(2000)
+
+	i++
+	recordBuilder.Field(i).(*array.Float16Builder).Append(float16.New(10000.1))
+	recordBuilder.Field(i).(*array.Float16Builder).Append(float16.New(20000.1))
+
+	i++
+	recordBuilder.Field(i).(*array.Float32Builder).Append(100000.1)
+	recordBuilder.Field(i).(*array.Float32Builder).Append(200000.1)
+
+	i++
+	recordBuilder.Field(i).(*array.Float64Builder).Append(1000000.1)
+	recordBuilder.Field(i).(*array.Float64Builder).Append(2000000.1)
+
+	i++
+	recordBuilder.Field(i).(*array.Decimal128Builder).Append(decimal128.FromI64(10000000))
+	recordBuilder.Field(i).(*array.Decimal128Builder).Append(decimal128.FromI64(20000000))
+
+	i++
+	recordBuilder.Field(i).(*array.Decimal256Builder).Append(decimal256.FromI64(100000000))
+	recordBuilder.Field(i).(*array.Decimal256Builder).Append(decimal256.FromI64(200000000))
+
+	i++
+	recordBuilder.Field(i).(*array.StringBuilder).Append("str1")
+	recordBuilder.Field(i).(*array.StringBuilder).Append("str2")
+
+	i++
+	recordBuilder.Field(i).(*array.BinaryBuilder).Append([]byte("bytes1"))
+	recordBuilder.Field(i).(*array.BinaryBuilder).Append([]byte("bytes2"))
+
+	i++
+	recordBuilder.Field(i).(*array.TimestampBuilder).Append(arrow.Timestamp(1686981953115000))
+	recordBuilder.Field(i).(*array.TimestampBuilder).Append(arrow.Timestamp(1686981953116000))
+
+	i++
+	recordBuilder.Field(i).(*array.Date64Builder).Append(arrow.Date64(1686981953117000))
+	recordBuilder.Field(i).(*array.Date64Builder).Append(arrow.Date64(1686981953118000))
+
+	record := recordBuilder.NewRecord()
+	defer record.Release()
+
+	values, err := readArrowRecord(record)
+	require.Nil(t, err)
+	assert.Equal(t, 2, len(values))
+	assert.Equal(t, []any{
+		false, int8(1), int16(10), int32(100), int64(1000),
+		float16.New(10000.1), float32(100000.1), 1000000.1,
+		decimal128.FromI64(10000000), decimal256.FromI64(100000000),
+		"str1", []byte("bytes1"),
+		arrow.Timestamp(1686981953115000), arrow.Date64(1686981953117000)},
+		values[0])
+	assert.Equal(t, []any{
+		true, int8(2), int16(20), int32(200), int64(2000),
+		float16.New(20000.1), float32(200000.1), 2000000.1,
+		decimal128.FromI64(20000000), decimal256.FromI64(200000000),
+		"str2", []byte("bytes2"),
+		arrow.Timestamp(1686981953116000), arrow.Date64(1686981953118000)},
+		values[1])
+}
+
+func TestReadArrowRecord_UnsupportedType(t *testing.T) {
+	arrowFields := []arrow.Field{
+		{
+			Name: "unsupported_type_column",
+			Type: &arrow.MonthIntervalType{},
+		},
+	}
+	arrowSchema := arrow.NewSchema(arrowFields, nil)
+	var buf bytes.Buffer
+	arrowWriter := ipc.NewWriter(&buf, ipc.WithSchema(arrowSchema))
+	defer arrowWriter.Close()
+
+	alloc := memory.NewGoAllocator()
+	recordBuilder := array.NewRecordBuilder(alloc, arrowSchema)
+	defer recordBuilder.Release()
+
+	recordBuilder.Field(0).(*array.MonthIntervalBuilder).Append(1)
+
+	record := recordBuilder.NewRecord()
+	defer record.Release()
+
+	_, err := readArrowRecord(record)
+	require.NotNil(t, err)
+}
+
+func TestConvertProtoDataTypeToDataType(t *testing.T) {
+	booleanDataType := &proto.DataType{
+		Kind: &proto.DataType_Boolean_{},
+	}
+	assert.Equal(t, "Boolean", convertProtoDataTypeToDataType(booleanDataType).TypeName())
+
+	byteDataType := &proto.DataType{
+		Kind: &proto.DataType_Byte_{},
+	}
+	assert.Equal(t, "Byte", convertProtoDataTypeToDataType(byteDataType).TypeName())
+
+	shortDataType := &proto.DataType{
+		Kind: &proto.DataType_Short_{},
+	}
+	assert.Equal(t, "Short", convertProtoDataTypeToDataType(shortDataType).TypeName())
+
+	integerDataType := &proto.DataType{
+		Kind: &proto.DataType_Integer_{},
+	}
+	assert.Equal(t, "Integer", convertProtoDataTypeToDataType(integerDataType).TypeName())
+
+	longDataType := &proto.DataType{
+		Kind: &proto.DataType_Long_{},
+	}
+	assert.Equal(t, "Long", convertProtoDataTypeToDataType(longDataType).TypeName())
+
+	floatDataType := &proto.DataType{
+		Kind: &proto.DataType_Float_{},
+	}
+	assert.Equal(t, "Float", convertProtoDataTypeToDataType(floatDataType).TypeName())
+
+	doubleDataType := &proto.DataType{
+		Kind: &proto.DataType_Double_{},
+	}
+	assert.Equal(t, "Double", convertProtoDataTypeToDataType(doubleDataType).TypeName())
+
+	decimalDataType := &proto.DataType{
+		Kind: &proto.DataType_Decimal_{},
+	}
+	assert.Equal(t, "Decimal", convertProtoDataTypeToDataType(decimalDataType).TypeName())
+
+	stringDataType := &proto.DataType{
+		Kind: &proto.DataType_String_{},
+	}
+	assert.Equal(t, "String", convertProtoDataTypeToDataType(stringDataType).TypeName())
+
+	binaryDataType := &proto.DataType{
+		Kind: &proto.DataType_Binary_{},
+	}
+	assert.Equal(t, "Binary", convertProtoDataTypeToDataType(binaryDataType).TypeName())
+
+	timestampDataType := &proto.DataType{
+		Kind: &proto.DataType_Timestamp_{},
+	}
+	assert.Equal(t, "Timestamp", convertProtoDataTypeToDataType(timestampDataType).TypeName())
+
+	timestampNtzDataType := &proto.DataType{
+		Kind: &proto.DataType_TimestampNtz{},
+	}
+	assert.Equal(t, "TimestampNtz", convertProtoDataTypeToDataType(timestampNtzDataType).TypeName())
+
+	dateDataType := &proto.DataType{
+		Kind: &proto.DataType_Date_{},
+	}
+	assert.Equal(t, "Date", convertProtoDataTypeToDataType(dateDataType).TypeName())
+}
+
+func TestConvertProtoDataTypeToDataType_UnsupportedType(t *testing.T) {
+	unsupportedDataType := &proto.DataType{
+		Kind: &proto.DataType_YearMonthInterval_{},
+	}
+	assert.Equal(t, "Unsupported", convertProtoDataTypeToDataType(unsupportedDataType).TypeName())
+}
diff --git a/client/sql/datatype.go b/client/sql/datatype.go
index 05ab02e..e201114 100644
--- a/client/sql/datatype.go
+++ b/client/sql/datatype.go
@@ -25,6 +25,27 @@ type DataType interface {
 	TypeName() string
 }
 
+type BooleanType struct {
+}
+
+func (t BooleanType) TypeName() string {
+	return getDataTypeName(t)
+}
+
+type ByteType struct {
+}
+
+func (t ByteType) TypeName() string {
+	return getDataTypeName(t)
+}
+
+type ShortType struct {
+}
+
+func (t ShortType) TypeName() string {
+	return getDataTypeName(t)
+}
+
 type IntegerType struct {
 }
 
@@ -32,6 +53,34 @@ func (t IntegerType) TypeName() string {
 	return getDataTypeName(t)
 }
 
+type LongType struct {
+}
+
+func (t LongType) TypeName() string {
+	return getDataTypeName(t)
+}
+
+type FloatType struct {
+}
+
+func (t FloatType) TypeName() string {
+	return getDataTypeName(t)
+}
+
+type DoubleType struct {
+}
+
+func (t DoubleType) TypeName() string {
+	return getDataTypeName(t)
+}
+
+type DecimalType struct {
+}
+
+func (t DecimalType) TypeName() string {
+	return getDataTypeName(t)
+}
+
 type StringType struct {
 }
 
@@ -39,6 +88,34 @@ func (t StringType) TypeName() string {
 	return getDataTypeName(t)
 }
 
+type BinaryType struct {
+}
+
+func (t BinaryType) TypeName() string {
+	return getDataTypeName(t)
+}
+
+type TimestampType struct {
+}
+
+func (t TimestampType) TypeName() string {
+	return getDataTypeName(t)
+}
+
+type TimestampNtzType struct {
+}
+
+func (t TimestampNtzType) TypeName() string {
+	return getDataTypeName(t)
+}
+
+type DateType struct {
+}
+
+func (t DateType) TypeName() string {
+	return getDataTypeName(t)
+}
+
 type UnsupportedType struct {
 	TypeInfo any
 }
diff --git a/cmd/spark-connect-example-spark-session/main.go b/cmd/spark-connect-example-spark-session/main.go
index 6c0b5db..c35bbeb 100644
--- a/cmd/spark-connect-example-spark-session/main.go
+++ b/cmd/spark-connect-example-spark-session/main.go
@@ -41,6 +41,7 @@ func main() {
 		log.Fatalf("Failed: %s", err.Error())
 	}
 
+	log.Printf("DataFrame from sql: select 'apple' as word, 123 as count union all select 'orange' as word, 456 as count")
 	err = df.Show(100, false)
 	if err != nil {
 		log.Fatalf("Failed: %s", err.Error())
@@ -86,5 +87,19 @@ func main() {
 		log.Fatalf("Failed: %s", err.Error())
 	}
 
+	log.Printf("DataFrame from reading parquet")
+	df.Show(100, false)
+
+	err = df.CreateTempView("view1", true, false)
+	if err != nil {
+		log.Fatalf("Failed: %s", err.Error())
+	}
+
+	df, err = spark.Sql("select count, word from view1 order by count")
+	if err != nil {
+		log.Fatalf("Failed: %s", err.Error())
+	}
+
+	log.Printf("DataFrame from sql: select count, word from view1 order by count")
 	df.Show(100, false)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org