You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ze...@apache.org on 2022/05/17 15:00:26 UTC
[arrow] branch master updated: ARROW-16555: [Go][Parquet] Lift BitBlockCounter and VisitBitBlocks into shared internal utils
This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 5f7a5a4574 ARROW-16555: [Go][Parquet] Lift BitBlockCounter and VisitBitBlocks into shared internal utils
5f7a5a4574 is described below
commit 5f7a5a4574526da0493fd9dfa9e0a9c573bfcf3b
Author: Matthew Topol <mt...@factset.com>
AuthorDate: Tue May 17 11:00:17 2022 -0400
ARROW-16555: [Go][Parquet] Lift BitBlockCounter and VisitBitBlocks into shared internal utils
Closes #13135 from zeroshade/arrow-16555-shared-utils
Authored-by: Matthew Topol <mt...@factset.com>
Signed-off-by: Matthew Topol <mt...@factset.com>
---
.../bitutils}/bit_block_counter.go | 41 +++++++++++++++++++++-
.../bitutils}/bit_block_counter_test.go | 10 +++---
go/parquet/internal/utils/rle.go | 4 +--
go/parquet/internal/utils/typed_rle_dict.gen.go | 28 +++++++--------
.../internal/utils/typed_rle_dict.gen.go.tmpl | 18 +++++-----
5 files changed, 71 insertions(+), 30 deletions(-)
diff --git a/go/parquet/internal/utils/bit_block_counter.go b/go/internal/bitutils/bit_block_counter.go
similarity index 87%
rename from go/parquet/internal/utils/bit_block_counter.go
rename to go/internal/bitutils/bit_block_counter.go
index 8fce635721..56ce644c3a 100644
--- a/go/parquet/internal/utils/bit_block_counter.go
+++ b/go/internal/bitutils/bit_block_counter.go
@@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package utils
+package bitutils
import (
"math"
@@ -251,3 +251,42 @@ func VisitBitBlocks(bitmap []byte, offset, length int64, visitValid func(pos int
}
}
}
+
+// VisitBitBlocks is a utility for easily iterating through the blocks of bits in a bitmap,
+// calling the appropriate visitValid/visitInvalid function as we iterate through the bits.
+// visitValid is called with the bitoffset of the valid bit. Don't use this inside a tight
+// loop when performance is needed and instead prefer manually constructing these loops
+// in that scenario.
+func VisitBitBlocksShort(bitmap []byte, offset, length int64, visitValid func(pos int64) error, visitInvalid func() error) error {
+ counter := NewOptionalBitBlockCounter(bitmap, offset, length)
+ pos := int64(0)
+ for pos < length {
+ block := counter.NextBlock()
+ if block.AllSet() {
+ for i := 0; i < int(block.Len); i, pos = i+1, pos+1 {
+ if err := visitValid(pos); err != nil {
+ return err
+ }
+ }
+ } else if block.NoneSet() {
+ for i := 0; i < int(block.Len); i, pos = i+1, pos+1 {
+ if err := visitInvalid(); err != nil {
+ return err
+ }
+ }
+ } else {
+ for i := 0; i < int(block.Len); i, pos = i+1, pos+1 {
+ if bitutil.BitIsSet(bitmap, int(offset+pos)) {
+ if err := visitValid(pos); err != nil {
+ return err
+ }
+ } else {
+ if err := visitInvalid(); err != nil {
+ return err
+ }
+ }
+ }
+ }
+ }
+ return nil
+}
diff --git a/go/parquet/internal/utils/bit_block_counter_test.go b/go/internal/bitutils/bit_block_counter_test.go
similarity index 94%
rename from go/parquet/internal/utils/bit_block_counter_test.go
rename to go/internal/bitutils/bit_block_counter_test.go
index 4ef1cd16bb..db7ecb64c1 100644
--- a/go/parquet/internal/utils/bit_block_counter_test.go
+++ b/go/internal/bitutils/bit_block_counter_test.go
@@ -14,24 +14,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package utils_test
+package bitutils_test
import (
"testing"
"github.com/apache/arrow/go/v9/arrow/bitutil"
"github.com/apache/arrow/go/v9/arrow/memory"
- "github.com/apache/arrow/go/v9/parquet/internal/utils"
+ "github.com/apache/arrow/go/v9/internal/bitutils"
"github.com/stretchr/testify/assert"
"golang.org/x/exp/rand"
)
const kWordSize = 64
-func create(nbytes, offset, length int64) (*memory.Buffer, *utils.BitBlockCounter) {
+func create(nbytes, offset, length int64) (*memory.Buffer, *bitutils.BitBlockCounter) {
buf := memory.NewResizableBuffer(memory.DefaultAllocator)
buf.Resize(int(nbytes))
- return buf, utils.NewBitBlockCounter(buf.Bytes(), offset, length)
+ return buf, bitutils.NewBitBlockCounter(buf.Bytes(), offset, length)
}
func TestOneWordBasics(t *testing.T) {
@@ -188,7 +188,7 @@ func TestFourWordsRandomData(t *testing.T) {
r.Read(buf)
checkWithOffset := func(offset int64) {
- counter := utils.NewBitBlockCounter(buf, offset, nbytes*8-offset)
+ counter := bitutils.NewBitBlockCounter(buf, offset, nbytes*8-offset)
for i := 0; i < nbytes/32; i++ {
block := counter.NextFourWords()
assert.EqualValues(t, bitutil.CountSetBits(buf, i*256+int(offset), int(block.Len)), block.Popcnt)
diff --git a/go/parquet/internal/utils/rle.go b/go/parquet/internal/utils/rle.go
index 67567adfc7..a6ffaed57c 100644
--- a/go/parquet/internal/utils/rle.go
+++ b/go/parquet/internal/utils/rle.go
@@ -221,12 +221,12 @@ func (r *RleDecoder) GetBatchSpaced(vals []uint64, nullcount int, validBits []by
}
converter := plainConverter{}
- blockCounter := NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
+ blockCounter := bitutils.NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
var (
totalProcessed int
processed int
- block BitBlockCount
+ block bitutils.BitBlockCount
err error
)
diff --git a/go/parquet/internal/utils/typed_rle_dict.gen.go b/go/parquet/internal/utils/typed_rle_dict.gen.go
index ee4568392b..e482c62f37 100644
--- a/go/parquet/internal/utils/typed_rle_dict.gen.go
+++ b/go/parquet/internal/utils/typed_rle_dict.gen.go
@@ -31,9 +31,9 @@ func (r *RleDecoder) GetBatchWithDictSpacedInt32(dc DictionaryConverter, vals []
}
var (
- blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
+ blockCounter = bitutils.NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
processed = 0
- block BitBlockCount
+ block bitutils.BitBlockCount
)
for {
@@ -224,9 +224,9 @@ func (r *RleDecoder) GetBatchWithDictSpacedInt64(dc DictionaryConverter, vals []
}
var (
- blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
+ blockCounter = bitutils.NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
processed = 0
- block BitBlockCount
+ block bitutils.BitBlockCount
)
for {
@@ -417,9 +417,9 @@ func (r *RleDecoder) GetBatchWithDictSpacedInt96(dc DictionaryConverter, vals []
}
var (
- blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
+ blockCounter = bitutils.NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
processed = 0
- block BitBlockCount
+ block bitutils.BitBlockCount
)
for {
@@ -610,9 +610,9 @@ func (r *RleDecoder) GetBatchWithDictSpacedFloat32(dc DictionaryConverter, vals
}
var (
- blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
+ blockCounter = bitutils.NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
processed = 0
- block BitBlockCount
+ block bitutils.BitBlockCount
)
for {
@@ -803,9 +803,9 @@ func (r *RleDecoder) GetBatchWithDictSpacedFloat64(dc DictionaryConverter, vals
}
var (
- blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
+ blockCounter = bitutils.NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
processed = 0
- block BitBlockCount
+ block bitutils.BitBlockCount
)
for {
@@ -996,9 +996,9 @@ func (r *RleDecoder) GetBatchWithDictSpacedByteArray(dc DictionaryConverter, val
}
var (
- blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
+ blockCounter = bitutils.NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
processed = 0
- block BitBlockCount
+ block bitutils.BitBlockCount
)
for {
@@ -1189,9 +1189,9 @@ func (r *RleDecoder) GetBatchWithDictSpacedFixedLenByteArray(dc DictionaryConver
}
var (
- blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
+ blockCounter = bitutils.NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
processed = 0
- block BitBlockCount
+ block bitutils.BitBlockCount
)
for {
diff --git a/go/parquet/internal/utils/typed_rle_dict.gen.go.tmpl b/go/parquet/internal/utils/typed_rle_dict.gen.go.tmpl
index 1f542be7fb..b4a77c8d1a 100644
--- a/go/parquet/internal/utils/typed_rle_dict.gen.go.tmpl
+++ b/go/parquet/internal/utils/typed_rle_dict.gen.go.tmpl
@@ -18,6 +18,8 @@ package utils
import (
"github.com/apache/arrow/go/v9/parquet"
+ "github.com/apache/arrow/go/v9/internal/bitutils"
+ "github.com/apache/arrow/go/v9/internal/utils"
)
{{range .In}}
@@ -28,9 +30,9 @@ func (r *RleDecoder) GetBatchWithDictSpaced{{.Name}}(dc DictionaryConverter, val
}
var (
- blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
+ blockCounter = bitutils.NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
processed = 0
- block BitBlockCount
+ block bitutils.BitBlockCount
)
for {
@@ -76,7 +78,7 @@ func (r *RleDecoder) getspaced{{.Name}}(dc DictionaryConverter, vals []{{.name}}
var indexbuffer [bufferSize]IndexType
// assume no bits to start
- bitReader := NewBitRunReader(validBits, validBitsOffset, int64(batchSize))
+ bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset, int64(batchSize))
validRun := bitReader.NextRun()
for read < batchSize {
if validRun.Len == 0 {
@@ -126,8 +128,8 @@ func (r *RleDecoder) getspaced{{.Name}}(dc DictionaryConverter, vals []{{.name}}
return read, nil
}
-func (r *RleDecoder) consumeLiterals{{.Name}}(dc DictionaryConverter, vals []{{.name}}, remain int, buf []IndexType, run BitRun, bitRdr BitRunReader) (int, int, BitRun, error) {
- batch := MinInt(MinInt(remain, int(r.litCount)), len(buf))
+func (r *RleDecoder) consumeLiterals{{.Name}}(dc DictionaryConverter, vals []{{.name}}, remain int, buf []IndexType, run bitutils.BitRun, bitRdr bitutils.BitRunReader) (int, int, bitutils.BitRun, error) {
+ batch := utils.MinInt(utils.MinInt(remain, int(r.litCount)), len(buf))
buf = buf[:batch]
n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
@@ -145,7 +147,7 @@ func (r *RleDecoder) consumeLiterals{{.Name}}(dc DictionaryConverter, vals []{{.
)
for read < batch {
if run.Set {
- updateSize := MinInt(batch-read, int(run.Len))
+ updateSize := utils.MinInt(batch-read, int(run.Len))
if err := dc.Copy(vals, buf[read:read+updateSize]); err != nil {
return 0, 0, run, err
}
@@ -182,7 +184,7 @@ func (r *RleDecoder) GetBatchWithDict{{.Name}}(dc DictionaryConverter, vals []{{
if !dc.IsValid(idx) {
return read, nil
}
- batch := MinInt(remain, int(r.repCount))
+ batch := utils.MinInt(remain, int(r.repCount))
if err := dc.Fill(vals[:batch], idx); err != nil {
return read, err
}
@@ -190,7 +192,7 @@ func (r *RleDecoder) GetBatchWithDict{{.Name}}(dc DictionaryConverter, vals []{{
read += batch
vals = vals[batch:]
case r.litCount > 0:
- litbatch := MinInt(MinInt(remain, int(r.litCount)), 1024)
+ litbatch := utils.MinInt(utils.MinInt(remain, int(r.litCount)), 1024)
buf := indexbuffer[:litbatch]
n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
if n != litbatch {