You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/11/21 05:23:07 UTC
[skywalking-banyandb] branch kv created (now c994206)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a change to branch kv
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
at c994206 Fix some flaws in kv
This branch includes the following new commits:
new c994206 Fix some flaws in kv
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[skywalking-banyandb] 01/01: Fix some flaws in kv
Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch kv
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit c99420652e256111d0e5cdcf9385fcc4130c7a42
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Nov 21 05:20:09 2022 +0000
Fix some flaws in kv
* Correct int encoding disorder
* Add a print context helper for debugging encoding issues
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/kv/badger.go | 23 +++++
banyand/kv/kv.go | 2 +
banyand/measure/measure_query.go | 12 ++-
banyand/tsdb/block.go | 5 +
banyand/tsdb/series.go | 7 +-
banyand/tsdb/series_seek.go | 69 +++++++++++++
banyand/tsdb/series_seek_sort.go | 15 ++-
pkg/encoding/encoding.go | 2 +
pkg/encoding/int.go | 43 +++++---
pkg/encoding/int_test.go | 110 ++++++++++++++-------
pkg/encoding/plain.go | 9 ++
pkg/index/iterator.go | 4 +
pkg/pb/v1/write.go | 3 +-
.../measure/measure_plan_indexscan_local.go | 2 +-
14 files changed, 244 insertions(+), 62 deletions(-)
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 4482721..9ef467b 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -48,6 +48,25 @@ type badgerTSS struct {
badger.TSet
}
+func (b *badgerTSS) Context(key []byte, ts uint64, n int) (pre Iterator, next Iterator) {
+ preOpts := badger.DefaultIteratorOptions
+ preOpts.PrefetchSize = n
+ preOpts.PrefetchValues = false
+ preOpts.Prefix = key
+ preOpts.Reverse = false
+ nextOpts := badger.DefaultIteratorOptions
+ nextOpts.PrefetchSize = n
+ nextOpts.PrefetchValues = false
+ nextOpts.Prefix = key
+ nextOpts.Reverse = true
+ seekKey := y.KeyWithTs(key, ts)
+ preIter := b.db.NewIterator(preOpts)
+ preIter.Seek(seekKey)
+ nextIter := b.db.NewIterator(nextOpts)
+ nextIter.Seek(seekKey)
+ return &iterator{delegated: preIter}, &iterator{delegated: nextIter, reverse: true}
+}
+
func (b *badgerTSS) Stats() (s observability.Statistics) {
return badgerStats(b.db)
}
@@ -190,6 +209,10 @@ func (i *iterator) Key() []byte {
return y.ParseKey(i.delegated.Key())
}
+func (i *iterator) RawKey() []byte {
+ return i.delegated.Key()
+}
+
func (i *iterator) Val() []byte {
return y.Copy(i.delegated.Value().Value)
}
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 90944a2..5dd9e53 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -80,6 +80,7 @@ type TimeSeriesWriter interface {
type TimeSeriesReader interface {
// Get a value by its key and timestamp/version
Get(key []byte, ts uint64) ([]byte, error)
+ Context(key []byte, ts uint64, n int) (pre, next Iterator)
}
// TimeSeriesStore is time series storage
@@ -140,6 +141,7 @@ type Iterator interface {
Rewind()
Seek(key []byte)
Key() []byte
+ RawKey() []byte
Val() []byte
Valid() bool
Close() error
diff --git a/banyand/measure/measure_query.go b/banyand/measure/measure_query.go
index 89f1b13..4581f3f 100644
--- a/banyand/measure/measure_query.go
+++ b/banyand/measure/measure_query.go
@@ -114,10 +114,14 @@ func (s *measure) Shard(id common.ShardID) (tsdb.Shard, error) {
}
func (s *measure) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) {
- familyRawBytes, err := item.Family(familyIdentity(family, pbv1.TagFlag))
+ fid := familyIdentity(family, pbv1.TagFlag)
+ familyRawBytes, err := item.Family(fid)
if err != nil {
return nil, errors.Wrapf(err, "measure %s.%s parse family %s", s.name, s.group, family)
}
+ if len(familyRawBytes) < 1 {
+ item.PrintContext(s.l.Named("tag-family"), fid, 10)
+ }
tagFamily := &modelv1.TagFamilyForWrite{}
err = proto.Unmarshal(familyRawBytes, tagFamily)
if err != nil {
@@ -155,10 +159,14 @@ func (s *measure) ParseField(name string, item tsdb.Item) (*measurev1.DataPoint_
break
}
}
- bytes, err := item.Family(familyIdentity(name, pbv1.EncoderFieldFlag(fieldSpec, s.interval)))
+ fid := familyIdentity(name, pbv1.EncoderFieldFlag(fieldSpec, s.interval))
+ bytes, err := item.Family(fid)
if err != nil {
return nil, err
}
+ if len(bytes) < 1 {
+ item.PrintContext(s.l.Named("field"), fid, 10)
+ }
fieldValue, err := pbv1.DecodeFieldValue(bytes, fieldSpec)
if err != nil {
return nil, err
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 5ba3331..bdfde12 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -322,6 +322,7 @@ type BlockDelegate interface {
writeLSMIndex(fields []index.Field, id common.ItemID) error
writeInvertedIndex(fields []index.Field, id common.ItemID) error
dataReader() kv.TimeSeriesReader
+ decoderPool() encoding.SeriesDecoderPool
lsmIndexReader() index.Searcher
invertedIndexReader() index.Searcher
primaryIndexReader() index.FieldIterable
@@ -336,6 +337,10 @@ type bDelegate struct {
delegate *block
}
+func (d *bDelegate) decoderPool() encoding.SeriesDecoderPool {
+ return d.delegate.encodingMethod.DecoderPool
+}
+
func (d *bDelegate) dataReader() kv.TimeSeriesReader {
return d.delegate.store
}
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index 09d62aa..4d44f30 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -104,9 +104,10 @@ func (s *series) Get(ctx context.Context, id GlobalItemID) (Item, io.Closer, err
return nil, nil, errors.WithMessagef(ErrBlockAbsent, "id: %v", id)
}
return &item{
- data: b.dataReader(),
- itemID: id.ID,
- seriesID: s.id,
+ data: b.dataReader(),
+ itemID: id.ID,
+ seriesID: s.id,
+ decoderPool: b.decoderPool(),
}, b, nil
}
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index b3f08e1..18db6e2 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -18,11 +18,19 @@
package tsdb
import (
+ "bytes"
+ "encoding/hex"
+ "time"
+
+ "github.com/dgraph-io/badger/v3/y"
+
"github.com/apache/skywalking-banyandb/api/common"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/kv"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/index"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
)
type Iterator interface {
@@ -33,6 +41,7 @@ type Iterator interface {
type Item interface {
Family(family []byte) ([]byte, error)
+ PrintContext(l *logger.Logger, family []byte, n int)
Val() ([]byte, error)
ID() common.ItemID
SortedField() []byte
@@ -59,6 +68,7 @@ type seekerBuilder struct {
order modelv1.Sort
indexRuleForSorting *databasev1.IndexRule
rangeOptsForSorting index.RangeOpts
+ l *logger.Logger
}
func (s *seekerBuilder) Build() (Seeker, error) {
@@ -75,6 +85,7 @@ func (s *seekerBuilder) Build() (Seeker, error) {
func newSeekerBuilder(s *seriesSpan) SeekerBuilder {
return &seekerBuilder{
seriesSpan: s,
+ l: logger.GetLogger("seeker-builder"),
}
}
@@ -101,6 +112,7 @@ type item struct {
data kv.TimeSeriesReader
seriesID common.SeriesID
sortedField []byte
+ decoderPool encoding.SeriesDecoderPool
}
func (i *item) Time() uint64 {
@@ -119,6 +131,63 @@ func (i *item) Family(family []byte) ([]byte, error) {
return i.data.Get(d.marshal(), uint64(i.itemID))
}
+func (i *item) PrintContext(l *logger.Logger, family []byte, n int) {
+ decoder := i.decoderPool.Get(family)
+ defer i.decoderPool.Put(decoder)
+ d := dataBucket{
+ seriesID: i.seriesID,
+ family: family,
+ }
+ key := d.marshal()
+ pre, next := i.data.Context(key, uint64(i.itemID), n)
+ defer pre.Close()
+ defer next.Close()
+ j := 0
+ currentTS := uint64(i.itemID)
+
+ each := func(iter kv.Iterator) {
+ if !bytes.Equal(key, iter.Key()) {
+ return
+ }
+ j++
+
+ ts := y.ParseTs(iter.RawKey())
+
+ logEvent := l.Info().Int("i", j).
+ Time("ts", time.Unix(0, int64(y.ParseTs(iter.RawKey()))))
+ if err := decoder.Decode(family, iter.Val()); err != nil {
+ logEvent = logEvent.Str("loc", "mem")
+ if ts == currentTS {
+ logEvent = logEvent.Bool("at", true)
+ }
+ } else {
+ start, end := decoder.Range()
+ logEvent = logEvent.Time("start", time.Unix(0, int64(start))).
+ Time("end", time.Unix(0, int64(end))).Int("num", decoder.Len()).Str("loc", "table")
+ if start <= currentTS && currentTS <= end {
+ if dd, err := decoder.Get(currentTS); err == nil && len(dd) > 0 {
+ logEvent = logEvent.Bool("at", true)
+ }
+ }
+ }
+ logEvent.Send()
+ }
+
+ s := hex.EncodeToString(key)
+ if len(s) > 7 {
+ s = s[:7]
+ }
+ l.Info().Str("prefix", s).Time("ts", time.Unix(0, int64(i.itemID))).Msg("print previous lines")
+ for ; pre.Valid() && j < n; pre.Next() {
+ each(pre)
+ }
+ j = 0
+ l.Info().Str("prefix", s).Time("ts", time.Unix(0, int64(i.itemID))).Msg("print next lines")
+ for ; next.Valid() && j < n; next.Next() {
+ each(next)
+ }
+}
+
func (i *item) Val() ([]byte, error) {
d := dataBucket{
seriesID: i.seriesID,
diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go
index 76bc194..910499e 100644
--- a/banyand/tsdb/series_seek_sort.go
+++ b/banyand/tsdb/series_seek_sort.go
@@ -28,6 +28,7 @@ import (
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -88,7 +89,8 @@ func (s *seekerBuilder) buildSeriesByIndex() (series []Iterator, err error) {
return nil, err
}
if inner != nil {
- series = append(series, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, filters))
+ series = append(series, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), b.decoderPool(),
+ s.seriesSpan.seriesID, filters))
}
}
return
@@ -134,9 +136,11 @@ func (s *seekerBuilder) buildSeriesByTime() ([]Iterator, error) {
return nil, err
}
if filter == nil {
- delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, emptyFilters))
+ delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), b.decoderPool(),
+ s.seriesSpan.seriesID, emptyFilters))
} else {
- delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, []filterFn{filter}))
+ delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), b.decoderPool(),
+ s.seriesSpan.seriesID, []filterFn{filter}))
}
}
}
@@ -156,6 +160,7 @@ type searcherIterator struct {
curKey []byte
cur posting.Iterator
data kv.TimeSeriesReader
+ decoderPool encoding.SeriesDecoderPool
seriesID common.SeriesID
filters []filterFn
l *logger.Logger
@@ -193,6 +198,7 @@ func (s *searcherIterator) Val() Item {
itemID: s.cur.Current(),
data: s.data,
seriesID: s.seriesID,
+ decoderPool: s.decoderPool,
}
}
@@ -201,7 +207,7 @@ func (s *searcherIterator) Close() error {
}
func newSearcherIterator(l *logger.Logger, fieldIterator index.FieldIterator, data kv.TimeSeriesReader,
- seriesID common.SeriesID, filters []filterFn,
+ decoderPool encoding.SeriesDecoderPool, seriesID common.SeriesID, filters []filterFn,
) Iterator {
return &searcherIterator{
fieldIterator: fieldIterator,
@@ -209,6 +215,7 @@ func newSearcherIterator(l *logger.Logger, fieldIterator index.FieldIterator, da
seriesID: seriesID,
filters: filters,
l: l,
+ decoderPool: decoderPool,
}
}
diff --git a/pkg/encoding/encoding.go b/pkg/encoding/encoding.go
index 0382ee2..efc1f6e 100644
--- a/pkg/encoding/encoding.go
+++ b/pkg/encoding/encoding.go
@@ -79,6 +79,8 @@ type SeriesDecoder interface {
Get(ts uint64) ([]byte, error)
// Iterator returns a SeriesIterator
Iterator() SeriesIterator
+ // Range returns the start and end time of this series
+ Range() (start, end uint64)
}
// SeriesIterator iterates time series data
diff --git a/pkg/encoding/int.go b/pkg/encoding/int.go
index c9e1991..c9a48a4 100644
--- a/pkg/encoding/int.go
+++ b/pkg/encoding/int.go
@@ -138,8 +138,10 @@ func (ie *intEncoder) Append(ts uint64, value []byte) {
if ie.startTime == 0 {
ie.startTime = ts
ie.prevTime = ts
+ } else if ie.startTime > ts {
+ ie.startTime = ts
}
- gap := int(ts) - int(ie.prevTime)
+ gap := int(ie.prevTime) - int(ts)
if gap < 0 {
return
}
@@ -166,13 +168,15 @@ func (ie *intEncoder) Reset(key []byte) {
ie.interval = ie.fn(key)
ie.startTime = 0
ie.prevTime = 0
+ ie.num = 0
+ ie.values = NewXOREncoder(ie.bw)
}
func (ie *intEncoder) Encode() ([]byte, error) {
ie.bw.Flush()
buffWriter := buffer.NewBufferWriter(ie.buff)
buffWriter.PutUint64(ie.startTime)
- buffWriter.PutUint16(uint16(ie.size))
+ buffWriter.PutUint16(uint16(ie.num))
bb := buffWriter.Bytes()
encodedSize.WithLabelValues(ie.name, "int").Add(float64(len(bb)))
return bb, nil
@@ -195,6 +199,9 @@ type intDecoder struct {
}
func (i *intDecoder) Decode(key, data []byte) error {
+ if len(data) < 10 {
+ return ErrInvalidValue
+ }
i.interval = i.fn(key)
i.startTime = binary.LittleEndian.Uint64(data[len(data)-10 : len(data)-2])
i.num = int(binary.LittleEndian.Uint16(data[len(data)-2:]))
@@ -219,29 +226,33 @@ func (i intDecoder) Get(ts uint64) ([]byte, error) {
return zeroBytes, nil
}
+func (i intDecoder) Range() (start, end uint64) {
+ return i.startTime, i.startTime + uint64(i.num-1)*uint64(i.interval)
+}
+
func (i intDecoder) Iterator() SeriesIterator {
br := bit.NewReader(bytes.NewReader(i.area))
return &intIterator{
- startTime: i.startTime,
- interval: int(i.interval),
- br: br,
- values: NewXORDecoder(br),
- size: i.size,
+ endTime: i.startTime + uint64(i.num*int(i.interval)),
+ interval: int(i.interval),
+ br: br,
+ values: NewXORDecoder(br),
+ size: i.num,
}
}
var (
_ SeriesIterator = (*intIterator)(nil)
- zeroBytes = convert.Int64ToBytes(0)
- Zero = convert.BytesToUint64(zeroBytes)
+ zeroBytes = convert.Uint64ToBytes(zero)
+ zero = convert.BytesToUint64(convert.Int64ToBytes(0))
)
type intIterator struct {
- startTime uint64
- interval int
- size int
- br *bit.Reader
- values *XORDecoder
+ endTime uint64
+ interval int
+ size int
+ br *bit.Reader
+ values *XORDecoder
currVal uint64
currTime uint64
@@ -266,10 +277,10 @@ func (i *intIterator) Next() bool {
i.currVal = i.values.Value()
}
} else {
- i.currVal = Zero
+ i.currVal = zero
}
- i.currTime = i.startTime + uint64(i.interval*i.index)
i.index++
+ i.currTime = i.endTime - uint64(i.interval*i.index)
return true
}
diff --git a/pkg/encoding/int_test.go b/pkg/encoding/int_test.go
index 657bbe5..1f08278 100644
--- a/pkg/encoding/int_test.go
+++ b/pkg/encoding/int_test.go
@@ -28,8 +28,10 @@ import (
func TestNewIntEncoderAndDecoder(t *testing.T) {
type tsData struct {
- ts []uint64
- data []int64
+ ts []uint64
+ data []int64
+ start uint64
+ end uint64
}
tests := []struct {
name string
@@ -39,45 +41,53 @@ func TestNewIntEncoderAndDecoder(t *testing.T) {
{
name: "golden path",
args: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
data: []int64{7, 8, 7, 9},
},
want: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
- data: []int64{7, 8, 7, 9},
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+ data: []int64{7, 8, 7, 9},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
},
},
{
name: "more than the size",
args: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(4 * time.Minute)},
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)},
data: []int64{7, 8, 7, 9, 6},
},
want: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
- data: []int64{7, 8, 7, 9},
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+ data: []int64{7, 8, 7, 9},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
},
},
{
name: "less than the size",
args: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
+ ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
data: []int64{7, 8, 7},
},
want: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
- data: []int64{7, 8, 7},
+ ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+ data: []int64{7, 8, 7},
+ start: uint64(time.Minute),
+ end: uint64(3 * time.Minute),
},
},
{
name: "empty slot in the middle",
args: tsData{
- ts: []uint64{uint64(time.Minute), uint64(4 * time.Minute)},
+ ts: []uint64{uint64(4 * time.Minute), uint64(time.Minute)},
data: []int64{7, 9},
},
want: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
- data: []int64{7, 0, 0, 9},
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+ data: []int64{7, 0, 0, 9},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
},
},
}
@@ -93,36 +103,49 @@ func TestNewIntEncoderAndDecoder(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
at := assert.New(t)
encoder := encoderPool.Get(key)
+ defer encoderPool.Put(encoder)
decoder := decoderPool.Get(key)
+ defer decoderPool.Put(decoder)
encoder.Reset(key)
+ isFull := false
for i, v := range tt.args.ts {
encoder.Append(v, convert.Int64ToBytes(tt.args.data[i]))
if encoder.IsFull() {
+ isFull = true
break
}
}
bb, err := encoder.Encode()
at.NoError(err)
+
+ at.Equal(tt.want.start, encoder.StartTime())
at.NoError(decoder.Decode(key, bb))
- at.True(decoder.IsFull())
- iter := decoder.Iterator()
- for i, t := range tt.want.ts {
- at.True(iter.Next())
+ start, end := decoder.Range()
+ at.Equal(tt.want.start, start)
+ at.Equal(tt.want.end, end)
+ if isFull {
+ at.True(decoder.IsFull())
+ }
+ i := 0
+ for iter := decoder.Iterator(); iter.Next(); i++ {
at.NoError(iter.Error())
at.Equal(tt.want.ts[i], iter.Time())
at.Equal(tt.want.data[i], convert.BytesToInt64(iter.Val()))
- v, err := decoder.Get(t)
+ v, err := decoder.Get(iter.Time())
at.NoError(err)
at.Equal(tt.want.data[i], convert.BytesToInt64(v))
}
+ at.Equal(len(tt.want.ts), i)
})
}
}
func TestNewIntDecoderGet(t *testing.T) {
type tsData struct {
- ts []uint64
- data []int64
+ ts []uint64
+ data []int64
+ start uint64
+ end uint64
}
tests := []struct {
name string
@@ -132,45 +155,53 @@ func TestNewIntDecoderGet(t *testing.T) {
{
name: "golden path",
args: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
data: []int64{7, 8, 7, 9},
},
want: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
- data: []int64{7, 8, 7, 9},
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+ data: []int64{7, 8, 7, 9},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
},
},
{
name: "more than the size",
args: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(4 * time.Minute)},
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)},
data: []int64{7, 8, 7, 9, 6},
},
want: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(5 * time.Minute)},
- data: []int64{7, 8, 7, 9, 0},
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 0},
+ data: []int64{7, 8, 7, 9, 0},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
},
},
{
name: "less than the size",
args: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
+ ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
data: []int64{7, 8, 7},
},
want: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
- data: []int64{7, 8, 7},
+ ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+ data: []int64{7, 8, 7},
+ start: uint64(time.Minute),
+ end: uint64(3 * time.Minute),
},
},
{
name: "empty slot in the middle",
args: tsData{
- ts: []uint64{uint64(time.Minute), uint64(4 * time.Minute)},
+ ts: []uint64{uint64(4 * time.Minute), uint64(time.Minute)},
data: []int64{7, 9},
},
want: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
- data: []int64{7, 0, 0, 9},
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+ data: []int64{7, 0, 0, 9},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
},
},
}
@@ -186,18 +217,29 @@ func TestNewIntDecoderGet(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
at := assert.New(t)
encoder := encoderPool.Get(key)
+ defer encoderPool.Put(encoder)
decoder := decoderPool.Get(key)
+ defer decoderPool.Put(decoder)
encoder.Reset(key)
+ isFull := false
for i, v := range tt.args.ts {
encoder.Append(v, convert.Int64ToBytes(tt.args.data[i]))
if encoder.IsFull() {
+ isFull = true
break
}
}
bb, err := encoder.Encode()
at.NoError(err)
+
+ at.Equal(tt.want.start, encoder.StartTime())
at.NoError(decoder.Decode(key, bb))
- at.True(decoder.IsFull())
+ start, end := decoder.Range()
+ at.Equal(tt.want.start, start)
+ at.Equal(tt.want.end, end)
+ if isFull {
+ at.True(decoder.IsFull())
+ }
for i, t := range tt.want.ts {
v, err := decoder.Get(t)
at.NoError(err)
diff --git a/pkg/encoding/plain.go b/pkg/encoding/plain.go
index 2284273..388ee8c 100644
--- a/pkg/encoding/plain.go
+++ b/pkg/encoding/plain.go
@@ -202,6 +202,9 @@ func (t *plainDecoder) Len() int {
}
func (t *plainDecoder) Decode(_, rawData []byte) (err error) {
+ if len(rawData) < 2 {
+ return ErrInvalidValue
+ }
var data []byte
size := binary.LittleEndian.Uint16(rawData[len(rawData)-2:])
if data, err = zstdDecoder.DecodeAll(rawData[:len(rawData)-2], make([]byte, 0, size)); err != nil {
@@ -242,6 +245,12 @@ func (t *plainDecoder) Get(ts uint64) ([]byte, error) {
return getVal(t.val, parseOffset(slot))
}
+func (t *plainDecoder) Range() (start, end uint64) {
+ startSlot := getTSSlot(t.ts, int(t.num)-1)
+ endSlot := getTSSlot(t.ts, 0)
+ return parseTS(startSlot), parseTS(endSlot)
+}
+
func (t *plainDecoder) Iterator() SeriesIterator {
return newBlockItemIterator(t)
}
diff --git a/pkg/index/iterator.go b/pkg/index/iterator.go
index fe94b44..8259d7d 100644
--- a/pkg/index/iterator.go
+++ b/pkg/index/iterator.go
@@ -266,6 +266,10 @@ func (di *delegateIterator) Key() []byte {
return di.delegated.Key()
}
+func (di *delegateIterator) RawKey() []byte {
+ return di.delegated.RawKey()
+}
+
func (di *delegateIterator) Field() Field {
return di.curField
}
diff --git a/pkg/pb/v1/write.go b/pkg/pb/v1/write.go
index f56a68b..3d22f92 100644
--- a/pkg/pb/v1/write.go
+++ b/pkg/pb/v1/write.go
@@ -32,14 +32,13 @@ import (
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/pkg/convert"
- "github.com/apache/skywalking-banyandb/pkg/encoding"
)
type ID string
const fieldFlagLength = 9
-var zeroFieldValue = &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: int64(encoding.Zero)}}}
+var zeroFieldValue = &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 0}}}
var (
strDelimiter = []byte("\n")
diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index d7722d4..3640595 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -153,7 +153,7 @@ func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (executor.
projectionTagsRefs: i.projectionTagsRefs,
projectionFieldsRefs: i.projectionFieldsRefs,
}
- if len(iters) == 1 || i.groupByEntity {
+ if i.groupByEntity {
return newSeriesMIterator(iters, transformContext), nil
}
c := logical.CreateComparator(i.Sort)