You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ze...@apache.org on 2023/11/15 17:16:19 UTC
(arrow) branch main updated: GH-38503: [Go][Parquet] Make the arrow column writer internal (#38727)
This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 1e7175db8d GH-38503: [Go][Parquet] Make the arrow column writer internal (#38727)
1e7175db8d is described below
commit 1e7175db8d78313935cd1161728e9ae9dae57c9c
Author: Tim Schaub <ts...@users.noreply.github.com>
AuthorDate: Wed Nov 15 10:16:11 2023 -0700
GH-38503: [Go][Parquet] Make the arrow column writer internal (#38727)
This makes it so the Arrow column writer is not exported from the `pqarrow` package. This follows up on comments from #38581.
* Closes: #38503
Authored-by: Tim Schaub <ti...@planet.com>
Signed-off-by: Matt Topol <zo...@gmail.com>
---
go/parquet/pqarrow/encode_arrow.go | 26 +++++------
go/parquet/pqarrow/encode_arrow_test.go | 80 +++++++++++++--------------------
go/parquet/pqarrow/file_writer.go | 2 +-
3 files changed, 44 insertions(+), 64 deletions(-)
diff --git a/go/parquet/pqarrow/encode_arrow.go b/go/parquet/pqarrow/encode_arrow.go
index 4989837cd0..8926d0ba51 100644
--- a/go/parquet/pqarrow/encode_arrow.go
+++ b/go/parquet/pqarrow/encode_arrow.go
@@ -65,25 +65,25 @@ func nullableRoot(manifest *SchemaManifest, field *SchemaField) bool {
return nullable
}
-// ArrowColumnWriter is a convenience object for easily writing arrow data to a specific
+// arrowColumnWriter is a convenience object for easily writing arrow data to a specific
// set of columns in a parquet file. Since a single arrow array can itself be a nested type
// consisting of multiple columns of data, this will write to all of the appropriate leaves in
// the parquet file, allowing easy writing of nested columns.
-type ArrowColumnWriter struct {
+type arrowColumnWriter struct {
builders []*multipathLevelBuilder
leafCount int
colIdx int
rgw file.RowGroupWriter
}
-// NewArrowColumnWriter returns a new writer using the chunked array to determine the number of leaf columns,
+// newArrowColumnWriter returns a new writer using the chunked array to determine the number of leaf columns,
// and the provided schema manifest to determine the paths for writing the columns.
//
// Using an arrow column writer is a convenience to avoid having to process the arrow array yourself
// and determine the correct definition and repetition levels manually.
-func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *SchemaManifest, rgw file.RowGroupWriter, leafColIdx int) (ArrowColumnWriter, error) {
+func newArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *SchemaManifest, rgw file.RowGroupWriter, leafColIdx int) (arrowColumnWriter, error) {
if data.Len() == 0 {
- return ArrowColumnWriter{leafCount: calcLeafCount(data.DataType()), rgw: rgw}, nil
+ return arrowColumnWriter{leafCount: calcLeafCount(data.DataType()), rgw: rgw}, nil
}
var (
@@ -109,7 +109,7 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch
}
if absPos >= int64(data.Len()) {
- return ArrowColumnWriter{}, errors.New("cannot write data at offset past end of chunked array")
+ return arrowColumnWriter{}, errors.New("cannot write data at offset past end of chunked array")
}
leafCount := calcLeafCount(data.DataType())
@@ -120,7 +120,7 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch
schemaField, err := manifest.GetColumnField(leafColIdx)
if err != nil {
- return ArrowColumnWriter{}, err
+ return arrowColumnWriter{}, err
}
isNullable = nullableRoot(manifest, schemaField)
@@ -138,10 +138,10 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch
if arrToWrite.Len() > 0 {
bldr, err := newMultipathLevelBuilder(arrToWrite, isNullable)
if err != nil {
- return ArrowColumnWriter{}, nil
+ return arrowColumnWriter{}, nil
}
if leafCount != bldr.leafCount() {
- return ArrowColumnWriter{}, fmt.Errorf("data type leaf_count != builder leafcount: %d - %d", leafCount, bldr.leafCount())
+ return arrowColumnWriter{}, fmt.Errorf("data type leaf_count != builder leafcount: %d - %d", leafCount, bldr.leafCount())
}
builders = append(builders, bldr)
}
@@ -153,14 +153,10 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch
values += chunkWriteSize
}
- return ArrowColumnWriter{builders: builders, leafCount: leafCount, rgw: rgw, colIdx: leafColIdx}, nil
+ return arrowColumnWriter{builders: builders, leafCount: leafCount, rgw: rgw, colIdx: leafColIdx}, nil
}
-func (acw *ArrowColumnWriter) LeafCount() int {
- return acw.leafCount
-}
-
-func (acw *ArrowColumnWriter) Write(ctx context.Context) error {
+func (acw *arrowColumnWriter) Write(ctx context.Context) error {
arrCtx := arrowCtxFromContext(ctx)
for leafIdx := 0; leafIdx < acw.leafCount; leafIdx++ {
var (
diff --git a/go/parquet/pqarrow/encode_arrow_test.go b/go/parquet/pqarrow/encode_arrow_test.go
index 712a003c63..95ea644dd8 100644
--- a/go/parquet/pqarrow/encode_arrow_test.go
+++ b/go/parquet/pqarrow/encode_arrow_test.go
@@ -132,28 +132,24 @@ func TestWriteArrowCols(t *testing.T) {
tbl := makeDateTimeTypesTable(mem, false, false)
defer tbl.Release()
- psc, err := pqarrow.ToParquet(tbl.Schema(), nil, pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
- require.NoError(t, err)
-
- manifest, err := pqarrow.NewSchemaManifest(psc, nil, nil)
- require.NoError(t, err)
-
sink := encoding.NewBufferWriter(0, mem)
defer sink.Release()
- writer := file.NewParquetWriter(sink, psc.Root(), file.WithWriterProps(parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_4))))
- srgw := writer.AppendRowGroup()
- ctx := pqarrow.NewArrowWriteContext(context.TODO(), nil)
+ fileWriter, err := pqarrow.NewFileWriter(
+ tbl.Schema(),
+ sink,
+ parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_4)),
+ pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)),
+ )
+ require.NoError(t, err)
- colIdx := 0
+ fileWriter.NewRowGroup()
for i := int64(0); i < tbl.NumCols(); i++ {
- acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, colIdx)
+ colChunk := tbl.Column(int(i)).Data()
+ err := fileWriter.WriteColumnChunked(colChunk, 0, int64(colChunk.Len()))
require.NoError(t, err)
- require.NoError(t, acw.Write(ctx))
- colIdx = colIdx + acw.LeafCount()
}
- require.NoError(t, srgw.Close())
- require.NoError(t, writer.Close())
+ require.NoError(t, fileWriter.Close())
expected := makeDateTimeTypesTable(mem, true, false)
defer expected.Release()
@@ -235,31 +231,24 @@ func TestWriteArrowInt96(t *testing.T) {
tbl := makeDateTimeTypesTable(mem, false, false)
defer tbl.Release()
- props := pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true), pqarrow.WithAllocator(mem))
-
- psc, err := pqarrow.ToParquet(tbl.Schema(), nil, props)
- require.NoError(t, err)
-
- manifest, err := pqarrow.NewSchemaManifest(psc, nil, nil)
- require.NoError(t, err)
-
sink := encoding.NewBufferWriter(0, mem)
defer sink.Release()
- writer := file.NewParquetWriter(sink, psc.Root(), file.WithWriterProps(parquet.NewWriterProperties(parquet.WithAllocator(mem))))
-
- srgw := writer.AppendRowGroup()
- ctx := pqarrow.NewArrowWriteContext(context.TODO(), &props)
+ fileWriter, err := pqarrow.NewFileWriter(
+ tbl.Schema(),
+ sink,
+ parquet.NewWriterProperties(parquet.WithAllocator(mem)),
+ pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true), pqarrow.WithAllocator(mem)),
+ )
+ require.NoError(t, err)
- colIdx := 0
+ fileWriter.NewRowGroup()
for i := int64(0); i < tbl.NumCols(); i++ {
- acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, colIdx)
+ colChunk := tbl.Column(int(i)).Data()
+ err := fileWriter.WriteColumnChunked(colChunk, 0, int64(colChunk.Len()))
require.NoError(t, err)
- require.NoError(t, acw.Write(ctx))
- colIdx += acw.LeafCount()
}
- require.NoError(t, srgw.Close())
- require.NoError(t, writer.Close())
+ require.NoError(t, fileWriter.Close())
expected := makeDateTimeTypesTable(mem, false, false)
defer expected.Release()
@@ -296,33 +285,28 @@ func TestWriteArrowInt96(t *testing.T) {
func writeTableToBuffer(t *testing.T, mem memory.Allocator, tbl arrow.Table, rowGroupSize int64, props pqarrow.ArrowWriterProperties) *memory.Buffer {
sink := encoding.NewBufferWriter(0, mem)
defer sink.Release()
- wrprops := parquet.NewWriterProperties(parquet.WithVersion(parquet.V1_0))
- psc, err := pqarrow.ToParquet(tbl.Schema(), wrprops, props)
- require.NoError(t, err)
- manifest, err := pqarrow.NewSchemaManifest(psc, nil, nil)
+ fileWriter, err := pqarrow.NewFileWriter(
+ tbl.Schema(),
+ sink,
+ parquet.NewWriterProperties(parquet.WithVersion(parquet.V1_0)),
+ props,
+ )
require.NoError(t, err)
- writer := file.NewParquetWriter(sink, psc.Root(), file.WithWriterProps(wrprops))
- ctx := pqarrow.NewArrowWriteContext(context.TODO(), &props)
-
offset := int64(0)
for offset < tbl.NumRows() {
sz := utils.Min(rowGroupSize, tbl.NumRows()-offset)
- srgw := writer.AppendRowGroup()
- colIdx := 0
+ fileWriter.NewRowGroup()
for i := 0; i < int(tbl.NumCols()); i++ {
- col := tbl.Column(i)
- acw, err := pqarrow.NewArrowColumnWriter(col.Data(), offset, sz, manifest, srgw, colIdx)
+ colChunk := tbl.Column(i).Data()
+ err := fileWriter.WriteColumnChunked(colChunk, 0, int64(colChunk.Len()))
require.NoError(t, err)
- require.NoError(t, acw.Write(ctx))
- colIdx = colIdx + acw.LeafCount()
}
- srgw.Close()
offset += sz
}
- writer.Close()
+ require.NoError(t, fileWriter.Close())
return sink.Finish()
}
diff --git a/go/parquet/pqarrow/file_writer.go b/go/parquet/pqarrow/file_writer.go
index 21f16c0b67..bc484ba243 100644
--- a/go/parquet/pqarrow/file_writer.go
+++ b/go/parquet/pqarrow/file_writer.go
@@ -305,7 +305,7 @@ func (fw *FileWriter) Close() error {
// building of writing columns to a file via arrow data without needing to already have
// a record or table.
func (fw *FileWriter) WriteColumnChunked(data *arrow.Chunked, offset, size int64) error {
- acw, err := NewArrowColumnWriter(data, offset, size, fw.manifest, fw.rgw, fw.colIdx)
+ acw, err := newArrowColumnWriter(data, offset, size, fw.manifest, fw.rgw, fw.colIdx)
if err != nil {
return err
}