You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ze...@apache.org on 2023/12/08 19:06:38 UTC
(arrow) branch main updated: GH-38506: [Go][Parquet] Add NumRows and RowGroupNumRows to pqarrow.FileWriter (#38507)
This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 140ae018f3 GH-38506: [Go][Parquet] Add NumRows and RowGroupNumRows to pqarrow.FileWriter (#38507)
140ae018f3 is described below
commit 140ae018f372ee14c9ff19f3e4c2af1b1a579f49
Author: Tim Schaub <ts...@users.noreply.github.com>
AuthorDate: Fri Dec 8 20:06:32 2023 +0100
GH-38506: [Go][Parquet] Add NumRows and RowGroupNumRows to pqarrow.FileWriter (#38507)
### Rationale for this change
When using a chunked column reader to read from one Parquet file and a chunked column writer to write to another Parquet file, it can be useful to keep track of the number of rows written.
### What changes are included in this PR?
This branch adds a new `RowGroupNumRows` method to the `pqarrow.FileWriter`. This is somewhat similar to the existing `RowGroupTotalBytesWritten` function.
### Are these changes tested?
A new `file_writer_test.go` file is added that adds a test for the new method.
### Are there any user-facing changes?
The new method is exported and documented.
* Closes: #38506
Authored-by: Tim Schaub <ti...@planet.com>
Signed-off-by: Matt Topol <zo...@gmail.com>
---
go/parquet/pqarrow/file_writer.go | 17 +++++++
go/parquet/pqarrow/file_writer_test.go | 89 ++++++++++++++++++++++++++++++++++
2 files changed, 106 insertions(+)
diff --git a/go/parquet/pqarrow/file_writer.go b/go/parquet/pqarrow/file_writer.go
index bc484ba243..1164cd690c 100644
--- a/go/parquet/pqarrow/file_writer.go
+++ b/go/parquet/pqarrow/file_writer.go
@@ -134,6 +134,23 @@ func (fw *FileWriter) RowGroupTotalBytesWritten() int64 {
return 0
}
+// RowGroupNumRows returns the number of rows written to the current row group.
+// Returns an error if they are unequal between columns that have been written so far.
+func (fw *FileWriter) RowGroupNumRows() (int, error) {
+ if fw.rgw != nil {
+ return fw.rgw.NumRows()
+ }
+ return 0, nil
+}
+
+// NumRows returns the total number of rows that have been written so far.
+func (fw *FileWriter) NumRows() int {
+ if fw.wr != nil {
+ return fw.wr.NumRows()
+ }
+ return 0
+}
+
// WriteBuffered will either append to an existing row group or create a new one
// based on the record length and max row group length.
//
diff --git a/go/parquet/pqarrow/file_writer_test.go b/go/parquet/pqarrow/file_writer_test.go
new file mode 100644
index 0000000000..0b76733a62
--- /dev/null
+++ b/go/parquet/pqarrow/file_writer_test.go
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pqarrow_test
+
+import (
+ "bytes"
+ "strings"
+ "testing"
+
+ "github.com/apache/arrow/go/v15/arrow"
+ "github.com/apache/arrow/go/v15/arrow/array"
+ "github.com/apache/arrow/go/v15/arrow/memory"
+ "github.com/apache/arrow/go/v15/parquet"
+ "github.com/apache/arrow/go/v15/parquet/pqarrow"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestFileWriterRowGroupNumRows(t *testing.T) {
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "one", Nullable: true, Type: arrow.PrimitiveTypes.Float64},
+ {Name: "two", Nullable: true, Type: arrow.PrimitiveTypes.Float64},
+ }, nil)
+
+ data := `[
+ {"one": 1, "two": 2},
+ {"one": 1, "two": null},
+ {"one": null, "two": 2},
+ {"one": null, "two": null}
+ ]`
+ record, _, err := array.RecordFromJSON(memory.DefaultAllocator, schema, strings.NewReader(data))
+ require.NoError(t, err)
+
+ output := &bytes.Buffer{}
+ writerProps := parquet.NewWriterProperties(parquet.WithMaxRowGroupLength(100))
+ writer, err := pqarrow.NewFileWriter(schema, output, writerProps, pqarrow.DefaultWriterProps())
+ require.NoError(t, err)
+
+ require.NoError(t, writer.Write(record))
+ numRows, err := writer.RowGroupNumRows()
+ require.NoError(t, err)
+ assert.Equal(t, 4, numRows)
+ require.NoError(t, writer.Close())
+}
+
+func TestFileWriterNumRows(t *testing.T) {
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "one", Nullable: true, Type: arrow.PrimitiveTypes.Float64},
+ {Name: "two", Nullable: true, Type: arrow.PrimitiveTypes.Float64},
+ }, nil)
+
+ data := `[
+ {"one": 1, "two": 2},
+ {"one": 1, "two": null},
+ {"one": null, "two": 2},
+ {"one": null, "two": null}
+ ]`
+ record, _, err := array.RecordFromJSON(memory.DefaultAllocator, schema, strings.NewReader(data))
+ require.NoError(t, err)
+
+ maxRowGroupLength := 2
+
+ output := &bytes.Buffer{}
+ writerProps := parquet.NewWriterProperties(parquet.WithMaxRowGroupLength(int64(maxRowGroupLength)))
+ writer, err := pqarrow.NewFileWriter(schema, output, writerProps, pqarrow.DefaultWriterProps())
+ require.NoError(t, err)
+
+ require.NoError(t, writer.Write(record))
+ rowGroupNumRows, err := writer.RowGroupNumRows()
+ require.NoError(t, err)
+ assert.Equal(t, maxRowGroupLength, rowGroupNumRows)
+
+ require.NoError(t, writer.Close())
+ assert.Equal(t, 4, writer.NumRows())
+}