You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/11/21 05:23:08 UTC
[skywalking-banyandb] 01/01: Fix some flaws in kv
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch kv
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit c99420652e256111d0e5cdcf9385fcc4130c7a42
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Nov 21 05:20:09 2022 +0000
Fix some flaws in kv
* Correct int encoding disorder
* Add a print context helper for debugging encoding issues
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/kv/badger.go | 23 +++++
banyand/kv/kv.go | 2 +
banyand/measure/measure_query.go | 12 ++-
banyand/tsdb/block.go | 5 +
banyand/tsdb/series.go | 7 +-
banyand/tsdb/series_seek.go | 69 +++++++++++++
banyand/tsdb/series_seek_sort.go | 15 ++-
pkg/encoding/encoding.go | 2 +
pkg/encoding/int.go | 43 +++++---
pkg/encoding/int_test.go | 110 ++++++++++++++-------
pkg/encoding/plain.go | 9 ++
pkg/index/iterator.go | 4 +
pkg/pb/v1/write.go | 3 +-
.../measure/measure_plan_indexscan_local.go | 2 +-
14 files changed, 244 insertions(+), 62 deletions(-)
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 4482721..9ef467b 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -48,6 +48,25 @@ type badgerTSS struct {
badger.TSet
}
+func (b *badgerTSS) Context(key []byte, ts uint64, n int) (pre Iterator, next Iterator) {
+ preOpts := badger.DefaultIteratorOptions
+ preOpts.PrefetchSize = n
+ preOpts.PrefetchValues = false
+ preOpts.Prefix = key
+ preOpts.Reverse = false
+ nextOpts := badger.DefaultIteratorOptions
+ nextOpts.PrefetchSize = n
+ nextOpts.PrefetchValues = false
+ nextOpts.Prefix = key
+ nextOpts.Reverse = true
+ seekKey := y.KeyWithTs(key, ts)
+ preIter := b.db.NewIterator(preOpts)
+ preIter.Seek(seekKey)
+ nextIter := b.db.NewIterator(nextOpts)
+ nextIter.Seek(seekKey)
+ return &iterator{delegated: preIter}, &iterator{delegated: nextIter, reverse: true}
+}
+
func (b *badgerTSS) Stats() (s observability.Statistics) {
return badgerStats(b.db)
}
@@ -190,6 +209,10 @@ func (i *iterator) Key() []byte {
return y.ParseKey(i.delegated.Key())
}
+func (i *iterator) RawKey() []byte {
+ return i.delegated.Key()
+}
+
func (i *iterator) Val() []byte {
return y.Copy(i.delegated.Value().Value)
}
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 90944a2..5dd9e53 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -80,6 +80,7 @@ type TimeSeriesWriter interface {
type TimeSeriesReader interface {
// Get a value by its key and timestamp/version
Get(key []byte, ts uint64) ([]byte, error)
+ Context(key []byte, ts uint64, n int) (pre, next Iterator)
}
// TimeSeriesStore is time series storage
@@ -140,6 +141,7 @@ type Iterator interface {
Rewind()
Seek(key []byte)
Key() []byte
+ RawKey() []byte
Val() []byte
Valid() bool
Close() error
diff --git a/banyand/measure/measure_query.go b/banyand/measure/measure_query.go
index 89f1b13..4581f3f 100644
--- a/banyand/measure/measure_query.go
+++ b/banyand/measure/measure_query.go
@@ -114,10 +114,14 @@ func (s *measure) Shard(id common.ShardID) (tsdb.Shard, error) {
}
func (s *measure) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) {
- familyRawBytes, err := item.Family(familyIdentity(family, pbv1.TagFlag))
+ fid := familyIdentity(family, pbv1.TagFlag)
+ familyRawBytes, err := item.Family(fid)
if err != nil {
return nil, errors.Wrapf(err, "measure %s.%s parse family %s", s.name, s.group, family)
}
+ if len(familyRawBytes) < 1 {
+ item.PrintContext(s.l.Named("tag-family"), fid, 10)
+ }
tagFamily := &modelv1.TagFamilyForWrite{}
err = proto.Unmarshal(familyRawBytes, tagFamily)
if err != nil {
@@ -155,10 +159,14 @@ func (s *measure) ParseField(name string, item tsdb.Item) (*measurev1.DataPoint_
break
}
}
- bytes, err := item.Family(familyIdentity(name, pbv1.EncoderFieldFlag(fieldSpec, s.interval)))
+ fid := familyIdentity(name, pbv1.EncoderFieldFlag(fieldSpec, s.interval))
+ bytes, err := item.Family(fid)
if err != nil {
return nil, err
}
+ if len(bytes) < 1 {
+ item.PrintContext(s.l.Named("field"), fid, 10)
+ }
fieldValue, err := pbv1.DecodeFieldValue(bytes, fieldSpec)
if err != nil {
return nil, err
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 5ba3331..bdfde12 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -322,6 +322,7 @@ type BlockDelegate interface {
writeLSMIndex(fields []index.Field, id common.ItemID) error
writeInvertedIndex(fields []index.Field, id common.ItemID) error
dataReader() kv.TimeSeriesReader
+ decoderPool() encoding.SeriesDecoderPool
lsmIndexReader() index.Searcher
invertedIndexReader() index.Searcher
primaryIndexReader() index.FieldIterable
@@ -336,6 +337,10 @@ type bDelegate struct {
delegate *block
}
+func (d *bDelegate) decoderPool() encoding.SeriesDecoderPool {
+ return d.delegate.encodingMethod.DecoderPool
+}
+
func (d *bDelegate) dataReader() kv.TimeSeriesReader {
return d.delegate.store
}
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index 09d62aa..4d44f30 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -104,9 +104,10 @@ func (s *series) Get(ctx context.Context, id GlobalItemID) (Item, io.Closer, err
return nil, nil, errors.WithMessagef(ErrBlockAbsent, "id: %v", id)
}
return &item{
- data: b.dataReader(),
- itemID: id.ID,
- seriesID: s.id,
+ data: b.dataReader(),
+ itemID: id.ID,
+ seriesID: s.id,
+ decoderPool: b.decoderPool(),
}, b, nil
}
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index b3f08e1..18db6e2 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -18,11 +18,19 @@
package tsdb
import (
+ "bytes"
+ "encoding/hex"
+ "time"
+
+ "github.com/dgraph-io/badger/v3/y"
+
"github.com/apache/skywalking-banyandb/api/common"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/kv"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/index"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
)
type Iterator interface {
@@ -33,6 +41,7 @@ type Iterator interface {
type Item interface {
Family(family []byte) ([]byte, error)
+ PrintContext(l *logger.Logger, family []byte, n int)
Val() ([]byte, error)
ID() common.ItemID
SortedField() []byte
@@ -59,6 +68,7 @@ type seekerBuilder struct {
order modelv1.Sort
indexRuleForSorting *databasev1.IndexRule
rangeOptsForSorting index.RangeOpts
+ l *logger.Logger
}
func (s *seekerBuilder) Build() (Seeker, error) {
@@ -75,6 +85,7 @@ func (s *seekerBuilder) Build() (Seeker, error) {
func newSeekerBuilder(s *seriesSpan) SeekerBuilder {
return &seekerBuilder{
seriesSpan: s,
+ l: logger.GetLogger("seeker-builder"),
}
}
@@ -101,6 +112,7 @@ type item struct {
data kv.TimeSeriesReader
seriesID common.SeriesID
sortedField []byte
+ decoderPool encoding.SeriesDecoderPool
}
func (i *item) Time() uint64 {
@@ -119,6 +131,63 @@ func (i *item) Family(family []byte) ([]byte, error) {
return i.data.Get(d.marshal(), uint64(i.itemID))
}
+func (i *item) PrintContext(l *logger.Logger, family []byte, n int) {
+ decoder := i.decoderPool.Get(family)
+ defer i.decoderPool.Put(decoder)
+ d := dataBucket{
+ seriesID: i.seriesID,
+ family: family,
+ }
+ key := d.marshal()
+ pre, next := i.data.Context(key, uint64(i.itemID), n)
+ defer pre.Close()
+ defer next.Close()
+ j := 0
+ currentTS := uint64(i.itemID)
+
+ each := func(iter kv.Iterator) {
+ if !bytes.Equal(key, iter.Key()) {
+ return
+ }
+ j++
+
+ ts := y.ParseTs(iter.RawKey())
+
+ logEvent := l.Info().Int("i", j).
+ Time("ts", time.Unix(0, int64(y.ParseTs(iter.RawKey()))))
+ if err := decoder.Decode(family, iter.Val()); err != nil {
+ logEvent = logEvent.Str("loc", "mem")
+ if ts == currentTS {
+ logEvent = logEvent.Bool("at", true)
+ }
+ } else {
+ start, end := decoder.Range()
+ logEvent = logEvent.Time("start", time.Unix(0, int64(start))).
+ Time("end", time.Unix(0, int64(end))).Int("num", decoder.Len()).Str("loc", "table")
+ if start <= currentTS && currentTS <= end {
+ if dd, err := decoder.Get(currentTS); err == nil && len(dd) > 0 {
+ logEvent = logEvent.Bool("at", true)
+ }
+ }
+ }
+ logEvent.Send()
+ }
+
+ s := hex.EncodeToString(key)
+ if len(s) > 7 {
+ s = s[:7]
+ }
+ l.Info().Str("prefix", s).Time("ts", time.Unix(0, int64(i.itemID))).Msg("print previous lines")
+ for ; pre.Valid() && j < n; pre.Next() {
+ each(pre)
+ }
+ j = 0
+ l.Info().Str("prefix", s).Time("ts", time.Unix(0, int64(i.itemID))).Msg("print next lines")
+ for ; next.Valid() && j < n; next.Next() {
+ each(next)
+ }
+}
+
func (i *item) Val() ([]byte, error) {
d := dataBucket{
seriesID: i.seriesID,
diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go
index 76bc194..910499e 100644
--- a/banyand/tsdb/series_seek_sort.go
+++ b/banyand/tsdb/series_seek_sort.go
@@ -28,6 +28,7 @@ import (
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -88,7 +89,8 @@ func (s *seekerBuilder) buildSeriesByIndex() (series []Iterator, err error) {
return nil, err
}
if inner != nil {
- series = append(series, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, filters))
+ series = append(series, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), b.decoderPool(),
+ s.seriesSpan.seriesID, filters))
}
}
return
@@ -134,9 +136,11 @@ func (s *seekerBuilder) buildSeriesByTime() ([]Iterator, error) {
return nil, err
}
if filter == nil {
- delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, emptyFilters))
+ delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), b.decoderPool(),
+ s.seriesSpan.seriesID, emptyFilters))
} else {
- delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, []filterFn{filter}))
+ delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), b.decoderPool(),
+ s.seriesSpan.seriesID, []filterFn{filter}))
}
}
}
@@ -156,6 +160,7 @@ type searcherIterator struct {
curKey []byte
cur posting.Iterator
data kv.TimeSeriesReader
+ decoderPool encoding.SeriesDecoderPool
seriesID common.SeriesID
filters []filterFn
l *logger.Logger
@@ -193,6 +198,7 @@ func (s *searcherIterator) Val() Item {
itemID: s.cur.Current(),
data: s.data,
seriesID: s.seriesID,
+ decoderPool: s.decoderPool,
}
}
@@ -201,7 +207,7 @@ func (s *searcherIterator) Close() error {
}
func newSearcherIterator(l *logger.Logger, fieldIterator index.FieldIterator, data kv.TimeSeriesReader,
- seriesID common.SeriesID, filters []filterFn,
+ decoderPool encoding.SeriesDecoderPool, seriesID common.SeriesID, filters []filterFn,
) Iterator {
return &searcherIterator{
fieldIterator: fieldIterator,
@@ -209,6 +215,7 @@ func newSearcherIterator(l *logger.Logger, fieldIterator index.FieldIterator, da
seriesID: seriesID,
filters: filters,
l: l,
+ decoderPool: decoderPool,
}
}
diff --git a/pkg/encoding/encoding.go b/pkg/encoding/encoding.go
index 0382ee2..efc1f6e 100644
--- a/pkg/encoding/encoding.go
+++ b/pkg/encoding/encoding.go
@@ -79,6 +79,8 @@ type SeriesDecoder interface {
Get(ts uint64) ([]byte, error)
// Iterator returns a SeriesIterator
Iterator() SeriesIterator
+ // Range returns the start and end time of this series
+ Range() (start, end uint64)
}
// SeriesIterator iterates time series data
diff --git a/pkg/encoding/int.go b/pkg/encoding/int.go
index c9e1991..c9a48a4 100644
--- a/pkg/encoding/int.go
+++ b/pkg/encoding/int.go
@@ -138,8 +138,10 @@ func (ie *intEncoder) Append(ts uint64, value []byte) {
if ie.startTime == 0 {
ie.startTime = ts
ie.prevTime = ts
+ } else if ie.startTime > ts {
+ ie.startTime = ts
}
- gap := int(ts) - int(ie.prevTime)
+ gap := int(ie.prevTime) - int(ts)
if gap < 0 {
return
}
@@ -166,13 +168,15 @@ func (ie *intEncoder) Reset(key []byte) {
ie.interval = ie.fn(key)
ie.startTime = 0
ie.prevTime = 0
+ ie.num = 0
+ ie.values = NewXOREncoder(ie.bw)
}
func (ie *intEncoder) Encode() ([]byte, error) {
ie.bw.Flush()
buffWriter := buffer.NewBufferWriter(ie.buff)
buffWriter.PutUint64(ie.startTime)
- buffWriter.PutUint16(uint16(ie.size))
+ buffWriter.PutUint16(uint16(ie.num))
bb := buffWriter.Bytes()
encodedSize.WithLabelValues(ie.name, "int").Add(float64(len(bb)))
return bb, nil
@@ -195,6 +199,9 @@ type intDecoder struct {
}
func (i *intDecoder) Decode(key, data []byte) error {
+ if len(data) < 10 {
+ return ErrInvalidValue
+ }
i.interval = i.fn(key)
i.startTime = binary.LittleEndian.Uint64(data[len(data)-10 : len(data)-2])
i.num = int(binary.LittleEndian.Uint16(data[len(data)-2:]))
@@ -219,29 +226,33 @@ func (i intDecoder) Get(ts uint64) ([]byte, error) {
return zeroBytes, nil
}
+func (i intDecoder) Range() (start, end uint64) {
+ return i.startTime, i.startTime + uint64(i.num-1)*uint64(i.interval)
+}
+
func (i intDecoder) Iterator() SeriesIterator {
br := bit.NewReader(bytes.NewReader(i.area))
return &intIterator{
- startTime: i.startTime,
- interval: int(i.interval),
- br: br,
- values: NewXORDecoder(br),
- size: i.size,
+ endTime: i.startTime + uint64(i.num*int(i.interval)),
+ interval: int(i.interval),
+ br: br,
+ values: NewXORDecoder(br),
+ size: i.num,
}
}
var (
_ SeriesIterator = (*intIterator)(nil)
- zeroBytes = convert.Int64ToBytes(0)
- Zero = convert.BytesToUint64(zeroBytes)
+ zeroBytes = convert.Uint64ToBytes(zero)
+ zero = convert.BytesToUint64(convert.Int64ToBytes(0))
)
type intIterator struct {
- startTime uint64
- interval int
- size int
- br *bit.Reader
- values *XORDecoder
+ endTime uint64
+ interval int
+ size int
+ br *bit.Reader
+ values *XORDecoder
currVal uint64
currTime uint64
@@ -266,10 +277,10 @@ func (i *intIterator) Next() bool {
i.currVal = i.values.Value()
}
} else {
- i.currVal = Zero
+ i.currVal = zero
}
- i.currTime = i.startTime + uint64(i.interval*i.index)
i.index++
+ i.currTime = i.endTime - uint64(i.interval*i.index)
return true
}
diff --git a/pkg/encoding/int_test.go b/pkg/encoding/int_test.go
index 657bbe5..1f08278 100644
--- a/pkg/encoding/int_test.go
+++ b/pkg/encoding/int_test.go
@@ -28,8 +28,10 @@ import (
func TestNewIntEncoderAndDecoder(t *testing.T) {
type tsData struct {
- ts []uint64
- data []int64
+ ts []uint64
+ data []int64
+ start uint64
+ end uint64
}
tests := []struct {
name string
@@ -39,45 +41,53 @@ func TestNewIntEncoderAndDecoder(t *testing.T) {
{
name: "golden path",
args: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
data: []int64{7, 8, 7, 9},
},
want: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
- data: []int64{7, 8, 7, 9},
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+ data: []int64{7, 8, 7, 9},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
},
},
{
name: "more than the size",
args: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(4 * time.Minute)},
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)},
data: []int64{7, 8, 7, 9, 6},
},
want: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
- data: []int64{7, 8, 7, 9},
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+ data: []int64{7, 8, 7, 9},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
},
},
{
name: "less than the size",
args: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
+ ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
data: []int64{7, 8, 7},
},
want: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
- data: []int64{7, 8, 7},
+ ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+ data: []int64{7, 8, 7},
+ start: uint64(time.Minute),
+ end: uint64(3 * time.Minute),
},
},
{
name: "empty slot in the middle",
args: tsData{
- ts: []uint64{uint64(time.Minute), uint64(4 * time.Minute)},
+ ts: []uint64{uint64(4 * time.Minute), uint64(time.Minute)},
data: []int64{7, 9},
},
want: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
- data: []int64{7, 0, 0, 9},
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+ data: []int64{7, 0, 0, 9},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
},
},
}
@@ -93,36 +103,49 @@ func TestNewIntEncoderAndDecoder(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
at := assert.New(t)
encoder := encoderPool.Get(key)
+ defer encoderPool.Put(encoder)
decoder := decoderPool.Get(key)
+ defer decoderPool.Put(decoder)
encoder.Reset(key)
+ isFull := false
for i, v := range tt.args.ts {
encoder.Append(v, convert.Int64ToBytes(tt.args.data[i]))
if encoder.IsFull() {
+ isFull = true
break
}
}
bb, err := encoder.Encode()
at.NoError(err)
+
+ at.Equal(tt.want.start, encoder.StartTime())
at.NoError(decoder.Decode(key, bb))
- at.True(decoder.IsFull())
- iter := decoder.Iterator()
- for i, t := range tt.want.ts {
- at.True(iter.Next())
+ start, end := decoder.Range()
+ at.Equal(tt.want.start, start)
+ at.Equal(tt.want.end, end)
+ if isFull {
+ at.True(decoder.IsFull())
+ }
+ i := 0
+ for iter := decoder.Iterator(); iter.Next(); i++ {
at.NoError(iter.Error())
at.Equal(tt.want.ts[i], iter.Time())
at.Equal(tt.want.data[i], convert.BytesToInt64(iter.Val()))
- v, err := decoder.Get(t)
+ v, err := decoder.Get(iter.Time())
at.NoError(err)
at.Equal(tt.want.data[i], convert.BytesToInt64(v))
}
+ at.Equal(len(tt.want.ts), i)
})
}
}
func TestNewIntDecoderGet(t *testing.T) {
type tsData struct {
- ts []uint64
- data []int64
+ ts []uint64
+ data []int64
+ start uint64
+ end uint64
}
tests := []struct {
name string
@@ -132,45 +155,53 @@ func TestNewIntDecoderGet(t *testing.T) {
{
name: "golden path",
args: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
data: []int64{7, 8, 7, 9},
},
want: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
- data: []int64{7, 8, 7, 9},
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+ data: []int64{7, 8, 7, 9},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
},
},
{
name: "more than the size",
args: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(4 * time.Minute)},
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)},
data: []int64{7, 8, 7, 9, 6},
},
want: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(5 * time.Minute)},
- data: []int64{7, 8, 7, 9, 0},
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 0},
+ data: []int64{7, 8, 7, 9, 0},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
},
},
{
name: "less than the size",
args: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
+ ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
data: []int64{7, 8, 7},
},
want: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
- data: []int64{7, 8, 7},
+ ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+ data: []int64{7, 8, 7},
+ start: uint64(time.Minute),
+ end: uint64(3 * time.Minute),
},
},
{
name: "empty slot in the middle",
args: tsData{
- ts: []uint64{uint64(time.Minute), uint64(4 * time.Minute)},
+ ts: []uint64{uint64(4 * time.Minute), uint64(time.Minute)},
data: []int64{7, 9},
},
want: tsData{
- ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
- data: []int64{7, 0, 0, 9},
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+ data: []int64{7, 0, 0, 9},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
},
},
}
@@ -186,18 +217,29 @@ func TestNewIntDecoderGet(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
at := assert.New(t)
encoder := encoderPool.Get(key)
+ defer encoderPool.Put(encoder)
decoder := decoderPool.Get(key)
+ defer decoderPool.Put(decoder)
encoder.Reset(key)
+ isFull := false
for i, v := range tt.args.ts {
encoder.Append(v, convert.Int64ToBytes(tt.args.data[i]))
if encoder.IsFull() {
+ isFull = true
break
}
}
bb, err := encoder.Encode()
at.NoError(err)
+
+ at.Equal(tt.want.start, encoder.StartTime())
at.NoError(decoder.Decode(key, bb))
- at.True(decoder.IsFull())
+ start, end := decoder.Range()
+ at.Equal(tt.want.start, start)
+ at.Equal(tt.want.end, end)
+ if isFull {
+ at.True(decoder.IsFull())
+ }
for i, t := range tt.want.ts {
v, err := decoder.Get(t)
at.NoError(err)
diff --git a/pkg/encoding/plain.go b/pkg/encoding/plain.go
index 2284273..388ee8c 100644
--- a/pkg/encoding/plain.go
+++ b/pkg/encoding/plain.go
@@ -202,6 +202,9 @@ func (t *plainDecoder) Len() int {
}
func (t *plainDecoder) Decode(_, rawData []byte) (err error) {
+ if len(rawData) < 2 {
+ return ErrInvalidValue
+ }
var data []byte
size := binary.LittleEndian.Uint16(rawData[len(rawData)-2:])
if data, err = zstdDecoder.DecodeAll(rawData[:len(rawData)-2], make([]byte, 0, size)); err != nil {
@@ -242,6 +245,12 @@ func (t *plainDecoder) Get(ts uint64) ([]byte, error) {
return getVal(t.val, parseOffset(slot))
}
+func (t *plainDecoder) Range() (start, end uint64) {
+ startSlot := getTSSlot(t.ts, int(t.num)-1)
+ endSlot := getTSSlot(t.ts, 0)
+ return parseTS(startSlot), parseTS(endSlot)
+}
+
func (t *plainDecoder) Iterator() SeriesIterator {
return newBlockItemIterator(t)
}
diff --git a/pkg/index/iterator.go b/pkg/index/iterator.go
index fe94b44..8259d7d 100644
--- a/pkg/index/iterator.go
+++ b/pkg/index/iterator.go
@@ -266,6 +266,10 @@ func (di *delegateIterator) Key() []byte {
return di.delegated.Key()
}
+func (di *delegateIterator) RawKey() []byte {
+ return di.delegated.RawKey()
+}
+
func (di *delegateIterator) Field() Field {
return di.curField
}
diff --git a/pkg/pb/v1/write.go b/pkg/pb/v1/write.go
index f56a68b..3d22f92 100644
--- a/pkg/pb/v1/write.go
+++ b/pkg/pb/v1/write.go
@@ -32,14 +32,13 @@ import (
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/pkg/convert"
- "github.com/apache/skywalking-banyandb/pkg/encoding"
)
type ID string
const fieldFlagLength = 9
-var zeroFieldValue = &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: int64(encoding.Zero)}}}
+var zeroFieldValue = &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 0}}}
var (
strDelimiter = []byte("\n")
diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index d7722d4..3640595 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -153,7 +153,7 @@ func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (executor.
projectionTagsRefs: i.projectionTagsRefs,
projectionFieldsRefs: i.projectionFieldsRefs,
}
- if len(iters) == 1 || i.groupByEntity {
+ if i.groupByEntity {
return newSeriesMIterator(iters, transformContext), nil
}
c := logical.CreateComparator(i.Sort)