You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ze...@apache.org on 2022/05/17 15:00:26 UTC

[arrow] branch master updated: ARROW-16555: [Go][Parquet] Lift BitBlockCounter and VisitBitBlocks into shared internal utils

This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f7a5a4574 ARROW-16555: [Go][Parquet] Lift BitBlockCounter and VisitBitBlocks into shared internal utils
5f7a5a4574 is described below

commit 5f7a5a4574526da0493fd9dfa9e0a9c573bfcf3b
Author: Matthew Topol <mt...@factset.com>
AuthorDate: Tue May 17 11:00:17 2022 -0400

    ARROW-16555: [Go][Parquet] Lift BitBlockCounter and VisitBitBlocks into shared internal utils
    
    Closes #13135 from zeroshade/arrow-16555-shared-utils
    
    Authored-by: Matthew Topol <mt...@factset.com>
    Signed-off-by: Matthew Topol <mt...@factset.com>
---
 .../bitutils}/bit_block_counter.go                 | 41 +++++++++++++++++++++-
 .../bitutils}/bit_block_counter_test.go            | 10 +++---
 go/parquet/internal/utils/rle.go                   |  4 +--
 go/parquet/internal/utils/typed_rle_dict.gen.go    | 28 +++++++--------
 .../internal/utils/typed_rle_dict.gen.go.tmpl      | 18 +++++-----
 5 files changed, 71 insertions(+), 30 deletions(-)

diff --git a/go/parquet/internal/utils/bit_block_counter.go b/go/internal/bitutils/bit_block_counter.go
similarity index 87%
rename from go/parquet/internal/utils/bit_block_counter.go
rename to go/internal/bitutils/bit_block_counter.go
index 8fce635721..56ce644c3a 100644
--- a/go/parquet/internal/utils/bit_block_counter.go
+++ b/go/internal/bitutils/bit_block_counter.go
@@ -14,7 +14,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package utils
+package bitutils
 
 import (
 	"math"
@@ -251,3 +251,42 @@ func VisitBitBlocks(bitmap []byte, offset, length int64, visitValid func(pos int
 		}
 	}
 }
+
+// VisitBitBlocks is a utility for easily iterating through the blocks of bits in a bitmap,
+// calling the appropriate visitValid/visitInvalid function as we iterate through the bits.
+// visitValid is called with the bitoffset of the valid bit. Don't use this inside a tight
+// loop when performance is needed and instead prefer manually constructing these loops
+// in that scenario.
+func VisitBitBlocksShort(bitmap []byte, offset, length int64, visitValid func(pos int64) error, visitInvalid func() error) error {
+	counter := NewOptionalBitBlockCounter(bitmap, offset, length)
+	pos := int64(0)
+	for pos < length {
+		block := counter.NextBlock()
+		if block.AllSet() {
+			for i := 0; i < int(block.Len); i, pos = i+1, pos+1 {
+				if err := visitValid(pos); err != nil {
+					return err
+				}
+			}
+		} else if block.NoneSet() {
+			for i := 0; i < int(block.Len); i, pos = i+1, pos+1 {
+				if err := visitInvalid(); err != nil {
+					return err
+				}
+			}
+		} else {
+			for i := 0; i < int(block.Len); i, pos = i+1, pos+1 {
+				if bitutil.BitIsSet(bitmap, int(offset+pos)) {
+					if err := visitValid(pos); err != nil {
+						return err
+					}
+				} else {
+					if err := visitInvalid(); err != nil {
+						return err
+					}
+				}
+			}
+		}
+	}
+	return nil
+}
diff --git a/go/parquet/internal/utils/bit_block_counter_test.go b/go/internal/bitutils/bit_block_counter_test.go
similarity index 94%
rename from go/parquet/internal/utils/bit_block_counter_test.go
rename to go/internal/bitutils/bit_block_counter_test.go
index 4ef1cd16bb..db7ecb64c1 100644
--- a/go/parquet/internal/utils/bit_block_counter_test.go
+++ b/go/internal/bitutils/bit_block_counter_test.go
@@ -14,24 +14,24 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package utils_test
+package bitutils_test
 
 import (
 	"testing"
 
 	"github.com/apache/arrow/go/v9/arrow/bitutil"
 	"github.com/apache/arrow/go/v9/arrow/memory"
-	"github.com/apache/arrow/go/v9/parquet/internal/utils"
+	"github.com/apache/arrow/go/v9/internal/bitutils"
 	"github.com/stretchr/testify/assert"
 	"golang.org/x/exp/rand"
 )
 
 const kWordSize = 64
 
-func create(nbytes, offset, length int64) (*memory.Buffer, *utils.BitBlockCounter) {
+func create(nbytes, offset, length int64) (*memory.Buffer, *bitutils.BitBlockCounter) {
 	buf := memory.NewResizableBuffer(memory.DefaultAllocator)
 	buf.Resize(int(nbytes))
-	return buf, utils.NewBitBlockCounter(buf.Bytes(), offset, length)
+	return buf, bitutils.NewBitBlockCounter(buf.Bytes(), offset, length)
 }
 
 func TestOneWordBasics(t *testing.T) {
@@ -188,7 +188,7 @@ func TestFourWordsRandomData(t *testing.T) {
 	r.Read(buf)
 
 	checkWithOffset := func(offset int64) {
-		counter := utils.NewBitBlockCounter(buf, offset, nbytes*8-offset)
+		counter := bitutils.NewBitBlockCounter(buf, offset, nbytes*8-offset)
 		for i := 0; i < nbytes/32; i++ {
 			block := counter.NextFourWords()
 			assert.EqualValues(t, bitutil.CountSetBits(buf, i*256+int(offset), int(block.Len)), block.Popcnt)
diff --git a/go/parquet/internal/utils/rle.go b/go/parquet/internal/utils/rle.go
index 67567adfc7..a6ffaed57c 100644
--- a/go/parquet/internal/utils/rle.go
+++ b/go/parquet/internal/utils/rle.go
@@ -221,12 +221,12 @@ func (r *RleDecoder) GetBatchSpaced(vals []uint64, nullcount int, validBits []by
 	}
 
 	converter := plainConverter{}
-	blockCounter := NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
+	blockCounter := bitutils.NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
 
 	var (
 		totalProcessed int
 		processed      int
-		block          BitBlockCount
+		block          bitutils.BitBlockCount
 		err            error
 	)
 
diff --git a/go/parquet/internal/utils/typed_rle_dict.gen.go b/go/parquet/internal/utils/typed_rle_dict.gen.go
index ee4568392b..e482c62f37 100644
--- a/go/parquet/internal/utils/typed_rle_dict.gen.go
+++ b/go/parquet/internal/utils/typed_rle_dict.gen.go
@@ -31,9 +31,9 @@ func (r *RleDecoder) GetBatchWithDictSpacedInt32(dc DictionaryConverter, vals []
 	}
 
 	var (
-		blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
+		blockCounter = bitutils.NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
 		processed    = 0
-		block        BitBlockCount
+		block        bitutils.BitBlockCount
 	)
 
 	for {
@@ -224,9 +224,9 @@ func (r *RleDecoder) GetBatchWithDictSpacedInt64(dc DictionaryConverter, vals []
 	}
 
 	var (
-		blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
+		blockCounter = bitutils.NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
 		processed    = 0
-		block        BitBlockCount
+		block        bitutils.BitBlockCount
 	)
 
 	for {
@@ -417,9 +417,9 @@ func (r *RleDecoder) GetBatchWithDictSpacedInt96(dc DictionaryConverter, vals []
 	}
 
 	var (
-		blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
+		blockCounter = bitutils.NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
 		processed    = 0
-		block        BitBlockCount
+		block        bitutils.BitBlockCount
 	)
 
 	for {
@@ -610,9 +610,9 @@ func (r *RleDecoder) GetBatchWithDictSpacedFloat32(dc DictionaryConverter, vals
 	}
 
 	var (
-		blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
+		blockCounter = bitutils.NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
 		processed    = 0
-		block        BitBlockCount
+		block        bitutils.BitBlockCount
 	)
 
 	for {
@@ -803,9 +803,9 @@ func (r *RleDecoder) GetBatchWithDictSpacedFloat64(dc DictionaryConverter, vals
 	}
 
 	var (
-		blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
+		blockCounter = bitutils.NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
 		processed    = 0
-		block        BitBlockCount
+		block        bitutils.BitBlockCount
 	)
 
 	for {
@@ -996,9 +996,9 @@ func (r *RleDecoder) GetBatchWithDictSpacedByteArray(dc DictionaryConverter, val
 	}
 
 	var (
-		blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
+		blockCounter = bitutils.NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
 		processed    = 0
-		block        BitBlockCount
+		block        bitutils.BitBlockCount
 	)
 
 	for {
@@ -1189,9 +1189,9 @@ func (r *RleDecoder) GetBatchWithDictSpacedFixedLenByteArray(dc DictionaryConver
 	}
 
 	var (
-		blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
+		blockCounter = bitutils.NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
 		processed    = 0
-		block        BitBlockCount
+		block        bitutils.BitBlockCount
 	)
 
 	for {
diff --git a/go/parquet/internal/utils/typed_rle_dict.gen.go.tmpl b/go/parquet/internal/utils/typed_rle_dict.gen.go.tmpl
index 1f542be7fb..b4a77c8d1a 100644
--- a/go/parquet/internal/utils/typed_rle_dict.gen.go.tmpl
+++ b/go/parquet/internal/utils/typed_rle_dict.gen.go.tmpl
@@ -18,6 +18,8 @@ package utils
 
 import (
   "github.com/apache/arrow/go/v9/parquet"
+  "github.com/apache/arrow/go/v9/internal/bitutils"
+  "github.com/apache/arrow/go/v9/internal/utils"
 )
 
 {{range .In}}
@@ -28,9 +30,9 @@ func (r *RleDecoder) GetBatchWithDictSpaced{{.Name}}(dc DictionaryConverter, val
   }
 
   var (
-    blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
+    blockCounter = bitutils.NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
     processed = 0
-    block BitBlockCount
+    block bitutils.BitBlockCount
   )
 
   for {
@@ -76,7 +78,7 @@ func (r *RleDecoder) getspaced{{.Name}}(dc DictionaryConverter, vals []{{.name}}
   var indexbuffer [bufferSize]IndexType
 
   // assume no bits to start
-  bitReader := NewBitRunReader(validBits, validBitsOffset, int64(batchSize))
+  bitReader := bitutils.NewBitRunReader(validBits, validBitsOffset, int64(batchSize))
   validRun := bitReader.NextRun()
   for read < batchSize {
     if validRun.Len == 0 {
@@ -126,8 +128,8 @@ func (r *RleDecoder) getspaced{{.Name}}(dc DictionaryConverter, vals []{{.name}}
   return read, nil
 }
 
-func (r *RleDecoder) consumeLiterals{{.Name}}(dc DictionaryConverter, vals []{{.name}}, remain int, buf []IndexType, run BitRun, bitRdr BitRunReader) (int, int, BitRun, error) {
-  batch := MinInt(MinInt(remain, int(r.litCount)), len(buf))
+func (r *RleDecoder) consumeLiterals{{.Name}}(dc DictionaryConverter, vals []{{.name}}, remain int, buf []IndexType, run bitutils.BitRun, bitRdr bitutils.BitRunReader) (int, int, bitutils.BitRun, error) {
+  batch := utils.MinInt(utils.MinInt(remain, int(r.litCount)), len(buf))
 	buf = buf[:batch]
 
 	n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
@@ -145,7 +147,7 @@ func (r *RleDecoder) consumeLiterals{{.Name}}(dc DictionaryConverter, vals []{{.
 	)
 	for read < batch {
 		if run.Set {
-			updateSize := MinInt(batch-read, int(run.Len))
+			updateSize := utils.MinInt(batch-read, int(run.Len))
 			if err := dc.Copy(vals, buf[read:read+updateSize]); err != nil {
 				return 0, 0, run, err
 			}
@@ -182,7 +184,7 @@ func (r *RleDecoder) GetBatchWithDict{{.Name}}(dc DictionaryConverter, vals []{{
       if !dc.IsValid(idx) {
         return read, nil
       }
-      batch := MinInt(remain, int(r.repCount))
+      batch := utils.MinInt(remain, int(r.repCount))
       if err := dc.Fill(vals[:batch], idx); err != nil {
         return read, err
       }
@@ -190,7 +192,7 @@ func (r *RleDecoder) GetBatchWithDict{{.Name}}(dc DictionaryConverter, vals []{{
       read += batch
       vals = vals[batch:]
     case r.litCount > 0:
-      litbatch := MinInt(MinInt(remain, int(r.litCount)), 1024)
+      litbatch := utils.MinInt(utils.MinInt(remain, int(r.litCount)), 1024)
       buf := indexbuffer[:litbatch]
       n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
       if n != litbatch {