You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ze...@apache.org on 2023/11/15 17:16:19 UTC

(arrow) branch main updated: GH-38503: [Go][Parquet] Make the arrow column writer internal (#38727)

This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e7175db8d GH-38503: [Go][Parquet] Make the arrow column writer internal (#38727)
1e7175db8d is described below

commit 1e7175db8d78313935cd1161728e9ae9dae57c9c
Author: Tim Schaub <ts...@users.noreply.github.com>
AuthorDate: Wed Nov 15 10:16:11 2023 -0700

    GH-38503: [Go][Parquet] Make the arrow column writer internal (#38727)
    
    This makes it so the Arrow column writer is not exported from the `pqarrow` package.  This follows up on comments from #38581.
    * Closes: #38503
    
    Authored-by: Tim Schaub <ti...@planet.com>
    Signed-off-by: Matt Topol <zo...@gmail.com>
---
 go/parquet/pqarrow/encode_arrow.go      | 26 +++++------
 go/parquet/pqarrow/encode_arrow_test.go | 80 +++++++++++++--------------------
 go/parquet/pqarrow/file_writer.go       |  2 +-
 3 files changed, 44 insertions(+), 64 deletions(-)

diff --git a/go/parquet/pqarrow/encode_arrow.go b/go/parquet/pqarrow/encode_arrow.go
index 4989837cd0..8926d0ba51 100644
--- a/go/parquet/pqarrow/encode_arrow.go
+++ b/go/parquet/pqarrow/encode_arrow.go
@@ -65,25 +65,25 @@ func nullableRoot(manifest *SchemaManifest, field *SchemaField) bool {
 	return nullable
 }
 
-// ArrowColumnWriter is a convenience object for easily writing arrow data to a specific
+// arrowColumnWriter is a convenience object for easily writing arrow data to a specific
 // set of columns in a parquet file. Since a single arrow array can itself be a nested type
 // consisting of multiple columns of data, this will write to all of the appropriate leaves in
 // the parquet file, allowing easy writing of nested columns.
-type ArrowColumnWriter struct {
+type arrowColumnWriter struct {
 	builders  []*multipathLevelBuilder
 	leafCount int
 	colIdx    int
 	rgw       file.RowGroupWriter
 }
 
-// NewArrowColumnWriter returns a new writer using the chunked array to determine the number of leaf columns,
+// newArrowColumnWriter returns a new writer using the chunked array to determine the number of leaf columns,
 // and the provided schema manifest to determine the paths for writing the columns.
 //
 // Using an arrow column writer is a convenience to avoid having to process the arrow array yourself
 // and determine the correct definition and repetition levels manually.
-func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *SchemaManifest, rgw file.RowGroupWriter, leafColIdx int) (ArrowColumnWriter, error) {
+func newArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *SchemaManifest, rgw file.RowGroupWriter, leafColIdx int) (arrowColumnWriter, error) {
 	if data.Len() == 0 {
-		return ArrowColumnWriter{leafCount: calcLeafCount(data.DataType()), rgw: rgw}, nil
+		return arrowColumnWriter{leafCount: calcLeafCount(data.DataType()), rgw: rgw}, nil
 	}
 
 	var (
@@ -109,7 +109,7 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch
 	}
 
 	if absPos >= int64(data.Len()) {
-		return ArrowColumnWriter{}, errors.New("cannot write data at offset past end of chunked array")
+		return arrowColumnWriter{}, errors.New("cannot write data at offset past end of chunked array")
 	}
 
 	leafCount := calcLeafCount(data.DataType())
@@ -120,7 +120,7 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch
 
 	schemaField, err := manifest.GetColumnField(leafColIdx)
 	if err != nil {
-		return ArrowColumnWriter{}, err
+		return arrowColumnWriter{}, err
 	}
 	isNullable = nullableRoot(manifest, schemaField)
 
@@ -138,10 +138,10 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch
 		if arrToWrite.Len() > 0 {
 			bldr, err := newMultipathLevelBuilder(arrToWrite, isNullable)
 			if err != nil {
-				return ArrowColumnWriter{}, nil
+				return arrowColumnWriter{}, nil
 			}
 			if leafCount != bldr.leafCount() {
-				return ArrowColumnWriter{}, fmt.Errorf("data type leaf_count != builder leafcount: %d - %d", leafCount, bldr.leafCount())
+				return arrowColumnWriter{}, fmt.Errorf("data type leaf_count != builder leafcount: %d - %d", leafCount, bldr.leafCount())
 			}
 			builders = append(builders, bldr)
 		}
@@ -153,14 +153,10 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch
 		values += chunkWriteSize
 	}
 
-	return ArrowColumnWriter{builders: builders, leafCount: leafCount, rgw: rgw, colIdx: leafColIdx}, nil
+	return arrowColumnWriter{builders: builders, leafCount: leafCount, rgw: rgw, colIdx: leafColIdx}, nil
 }
 
-func (acw *ArrowColumnWriter) LeafCount() int {
-	return acw.leafCount
-}
-
-func (acw *ArrowColumnWriter) Write(ctx context.Context) error {
+func (acw *arrowColumnWriter) Write(ctx context.Context) error {
 	arrCtx := arrowCtxFromContext(ctx)
 	for leafIdx := 0; leafIdx < acw.leafCount; leafIdx++ {
 		var (
diff --git a/go/parquet/pqarrow/encode_arrow_test.go b/go/parquet/pqarrow/encode_arrow_test.go
index 712a003c63..95ea644dd8 100644
--- a/go/parquet/pqarrow/encode_arrow_test.go
+++ b/go/parquet/pqarrow/encode_arrow_test.go
@@ -132,28 +132,24 @@ func TestWriteArrowCols(t *testing.T) {
 	tbl := makeDateTimeTypesTable(mem, false, false)
 	defer tbl.Release()
 
-	psc, err := pqarrow.ToParquet(tbl.Schema(), nil, pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
-	require.NoError(t, err)
-
-	manifest, err := pqarrow.NewSchemaManifest(psc, nil, nil)
-	require.NoError(t, err)
-
 	sink := encoding.NewBufferWriter(0, mem)
 	defer sink.Release()
-	writer := file.NewParquetWriter(sink, psc.Root(), file.WithWriterProps(parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_4))))
 
-	srgw := writer.AppendRowGroup()
-	ctx := pqarrow.NewArrowWriteContext(context.TODO(), nil)
+	fileWriter, err := pqarrow.NewFileWriter(
+		tbl.Schema(),
+		sink,
+		parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_4)),
+		pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)),
+	)
+	require.NoError(t, err)
 
-	colIdx := 0
+	fileWriter.NewRowGroup()
 	for i := int64(0); i < tbl.NumCols(); i++ {
-		acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, colIdx)
+		colChunk := tbl.Column(int(i)).Data()
+		err := fileWriter.WriteColumnChunked(colChunk, 0, int64(colChunk.Len()))
 		require.NoError(t, err)
-		require.NoError(t, acw.Write(ctx))
-		colIdx = colIdx + acw.LeafCount()
 	}
-	require.NoError(t, srgw.Close())
-	require.NoError(t, writer.Close())
+	require.NoError(t, fileWriter.Close())
 
 	expected := makeDateTimeTypesTable(mem, true, false)
 	defer expected.Release()
@@ -235,31 +231,24 @@ func TestWriteArrowInt96(t *testing.T) {
 	tbl := makeDateTimeTypesTable(mem, false, false)
 	defer tbl.Release()
 
-	props := pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true), pqarrow.WithAllocator(mem))
-
-	psc, err := pqarrow.ToParquet(tbl.Schema(), nil, props)
-	require.NoError(t, err)
-
-	manifest, err := pqarrow.NewSchemaManifest(psc, nil, nil)
-	require.NoError(t, err)
-
 	sink := encoding.NewBufferWriter(0, mem)
 	defer sink.Release()
 
-	writer := file.NewParquetWriter(sink, psc.Root(), file.WithWriterProps(parquet.NewWriterProperties(parquet.WithAllocator(mem))))
-
-	srgw := writer.AppendRowGroup()
-	ctx := pqarrow.NewArrowWriteContext(context.TODO(), &props)
+	fileWriter, err := pqarrow.NewFileWriter(
+		tbl.Schema(),
+		sink,
+		parquet.NewWriterProperties(parquet.WithAllocator(mem)),
+		pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true), pqarrow.WithAllocator(mem)),
+	)
+	require.NoError(t, err)
 
-	colIdx := 0
+	fileWriter.NewRowGroup()
 	for i := int64(0); i < tbl.NumCols(); i++ {
-		acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, colIdx)
+		colChunk := tbl.Column(int(i)).Data()
+		err := fileWriter.WriteColumnChunked(colChunk, 0, int64(colChunk.Len()))
 		require.NoError(t, err)
-		require.NoError(t, acw.Write(ctx))
-		colIdx += acw.LeafCount()
 	}
-	require.NoError(t, srgw.Close())
-	require.NoError(t, writer.Close())
+	require.NoError(t, fileWriter.Close())
 
 	expected := makeDateTimeTypesTable(mem, false, false)
 	defer expected.Release()
@@ -296,33 +285,28 @@ func TestWriteArrowInt96(t *testing.T) {
 func writeTableToBuffer(t *testing.T, mem memory.Allocator, tbl arrow.Table, rowGroupSize int64, props pqarrow.ArrowWriterProperties) *memory.Buffer {
 	sink := encoding.NewBufferWriter(0, mem)
 	defer sink.Release()
-	wrprops := parquet.NewWriterProperties(parquet.WithVersion(parquet.V1_0))
-	psc, err := pqarrow.ToParquet(tbl.Schema(), wrprops, props)
-	require.NoError(t, err)
 
-	manifest, err := pqarrow.NewSchemaManifest(psc, nil, nil)
+	fileWriter, err := pqarrow.NewFileWriter(
+		tbl.Schema(),
+		sink,
+		parquet.NewWriterProperties(parquet.WithVersion(parquet.V1_0)),
+		props,
+	)
 	require.NoError(t, err)
 
-	writer := file.NewParquetWriter(sink, psc.Root(), file.WithWriterProps(wrprops))
-	ctx := pqarrow.NewArrowWriteContext(context.TODO(), &props)
-
 	offset := int64(0)
 	for offset < tbl.NumRows() {
 		sz := utils.Min(rowGroupSize, tbl.NumRows()-offset)
-		srgw := writer.AppendRowGroup()
-		colIdx := 0
+		fileWriter.NewRowGroup()
 		for i := 0; i < int(tbl.NumCols()); i++ {
-			col := tbl.Column(i)
-			acw, err := pqarrow.NewArrowColumnWriter(col.Data(), offset, sz, manifest, srgw, colIdx)
+			colChunk := tbl.Column(i).Data()
+			err := fileWriter.WriteColumnChunked(colChunk, 0, int64(colChunk.Len()))
 			require.NoError(t, err)
-			require.NoError(t, acw.Write(ctx))
-			colIdx = colIdx + acw.LeafCount()
 		}
-		srgw.Close()
 		offset += sz
 	}
-	writer.Close()
 
+	require.NoError(t, fileWriter.Close())
 	return sink.Finish()
 }
 
diff --git a/go/parquet/pqarrow/file_writer.go b/go/parquet/pqarrow/file_writer.go
index 21f16c0b67..bc484ba243 100644
--- a/go/parquet/pqarrow/file_writer.go
+++ b/go/parquet/pqarrow/file_writer.go
@@ -305,7 +305,7 @@ func (fw *FileWriter) Close() error {
 // building of writing columns to a file via arrow data without needing to already have
 // a record or table.
 func (fw *FileWriter) WriteColumnChunked(data *arrow.Chunked, offset, size int64) error {
-	acw, err := NewArrowColumnWriter(data, offset, size, fw.manifest, fw.rgw, fw.colIdx)
+	acw, err := newArrowColumnWriter(data, offset, size, fw.manifest, fw.rgw, fw.colIdx)
 	if err != nil {
 		return err
 	}