You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/11/21 05:23:07 UTC

[skywalking-banyandb] branch kv created (now c994206)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch kv
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


      at c994206  Fix some flaws in kv

This branch includes the following new commits:

     new c994206  Fix some flaws in kv

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking-banyandb] 01/01: Fix some flaws in kv

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch kv
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit c99420652e256111d0e5cdcf9385fcc4130c7a42
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Nov 21 05:20:09 2022 +0000

    Fix some flaws in kv
    
    * Correct int encoding disorder
    * Add a print context helper for debugging encoding issues
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/kv/badger.go                               |  23 +++++
 banyand/kv/kv.go                                   |   2 +
 banyand/measure/measure_query.go                   |  12 ++-
 banyand/tsdb/block.go                              |   5 +
 banyand/tsdb/series.go                             |   7 +-
 banyand/tsdb/series_seek.go                        |  69 +++++++++++++
 banyand/tsdb/series_seek_sort.go                   |  15 ++-
 pkg/encoding/encoding.go                           |   2 +
 pkg/encoding/int.go                                |  43 +++++---
 pkg/encoding/int_test.go                           | 110 ++++++++++++++-------
 pkg/encoding/plain.go                              |   9 ++
 pkg/index/iterator.go                              |   4 +
 pkg/pb/v1/write.go                                 |   3 +-
 .../measure/measure_plan_indexscan_local.go        |   2 +-
 14 files changed, 244 insertions(+), 62 deletions(-)

diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 4482721..9ef467b 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -48,6 +48,25 @@ type badgerTSS struct {
 	badger.TSet
 }
 
+func (b *badgerTSS) Context(key []byte, ts uint64, n int) (pre Iterator, next Iterator) {
+	preOpts := badger.DefaultIteratorOptions
+	preOpts.PrefetchSize = n
+	preOpts.PrefetchValues = false
+	preOpts.Prefix = key
+	preOpts.Reverse = false
+	nextOpts := badger.DefaultIteratorOptions
+	nextOpts.PrefetchSize = n
+	nextOpts.PrefetchValues = false
+	nextOpts.Prefix = key
+	nextOpts.Reverse = true
+	seekKey := y.KeyWithTs(key, ts)
+	preIter := b.db.NewIterator(preOpts)
+	preIter.Seek(seekKey)
+	nextIter := b.db.NewIterator(nextOpts)
+	nextIter.Seek(seekKey)
+	return &iterator{delegated: preIter}, &iterator{delegated: nextIter, reverse: true}
+}
+
 func (b *badgerTSS) Stats() (s observability.Statistics) {
 	return badgerStats(b.db)
 }
@@ -190,6 +209,10 @@ func (i *iterator) Key() []byte {
 	return y.ParseKey(i.delegated.Key())
 }
 
+func (i *iterator) RawKey() []byte {
+	return i.delegated.Key()
+}
+
 func (i *iterator) Val() []byte {
 	return y.Copy(i.delegated.Value().Value)
 }
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 90944a2..5dd9e53 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -80,6 +80,7 @@ type TimeSeriesWriter interface {
 type TimeSeriesReader interface {
 	// Get a value by its key and timestamp/version
 	Get(key []byte, ts uint64) ([]byte, error)
+	Context(key []byte, ts uint64, n int) (pre, next Iterator)
 }
 
 // TimeSeriesStore is time series storage
@@ -140,6 +141,7 @@ type Iterator interface {
 	Rewind()
 	Seek(key []byte)
 	Key() []byte
+	RawKey() []byte
 	Val() []byte
 	Valid() bool
 	Close() error
diff --git a/banyand/measure/measure_query.go b/banyand/measure/measure_query.go
index 89f1b13..4581f3f 100644
--- a/banyand/measure/measure_query.go
+++ b/banyand/measure/measure_query.go
@@ -114,10 +114,14 @@ func (s *measure) Shard(id common.ShardID) (tsdb.Shard, error) {
 }
 
 func (s *measure) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) {
-	familyRawBytes, err := item.Family(familyIdentity(family, pbv1.TagFlag))
+	fid := familyIdentity(family, pbv1.TagFlag)
+	familyRawBytes, err := item.Family(fid)
 	if err != nil {
 		return nil, errors.Wrapf(err, "measure %s.%s parse family %s", s.name, s.group, family)
 	}
+	if len(familyRawBytes) < 1 {
+		item.PrintContext(s.l.Named("tag-family"), fid, 10)
+	}
 	tagFamily := &modelv1.TagFamilyForWrite{}
 	err = proto.Unmarshal(familyRawBytes, tagFamily)
 	if err != nil {
@@ -155,10 +159,14 @@ func (s *measure) ParseField(name string, item tsdb.Item) (*measurev1.DataPoint_
 			break
 		}
 	}
-	bytes, err := item.Family(familyIdentity(name, pbv1.EncoderFieldFlag(fieldSpec, s.interval)))
+	fid := familyIdentity(name, pbv1.EncoderFieldFlag(fieldSpec, s.interval))
+	bytes, err := item.Family(fid)
 	if err != nil {
 		return nil, err
 	}
+	if len(bytes) < 1 {
+		item.PrintContext(s.l.Named("field"), fid, 10)
+	}
 	fieldValue, err := pbv1.DecodeFieldValue(bytes, fieldSpec)
 	if err != nil {
 		return nil, err
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 5ba3331..bdfde12 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -322,6 +322,7 @@ type BlockDelegate interface {
 	writeLSMIndex(fields []index.Field, id common.ItemID) error
 	writeInvertedIndex(fields []index.Field, id common.ItemID) error
 	dataReader() kv.TimeSeriesReader
+	decoderPool() encoding.SeriesDecoderPool
 	lsmIndexReader() index.Searcher
 	invertedIndexReader() index.Searcher
 	primaryIndexReader() index.FieldIterable
@@ -336,6 +337,10 @@ type bDelegate struct {
 	delegate *block
 }
 
+func (d *bDelegate) decoderPool() encoding.SeriesDecoderPool {
+	return d.delegate.encodingMethod.DecoderPool
+}
+
 func (d *bDelegate) dataReader() kv.TimeSeriesReader {
 	return d.delegate.store
 }
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index 09d62aa..4d44f30 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -104,9 +104,10 @@ func (s *series) Get(ctx context.Context, id GlobalItemID) (Item, io.Closer, err
 		return nil, nil, errors.WithMessagef(ErrBlockAbsent, "id: %v", id)
 	}
 	return &item{
-		data:     b.dataReader(),
-		itemID:   id.ID,
-		seriesID: s.id,
+		data:        b.dataReader(),
+		itemID:      id.ID,
+		seriesID:    s.id,
+		decoderPool: b.decoderPool(),
 	}, b, nil
 }
 
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index b3f08e1..18db6e2 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -18,11 +18,19 @@
 package tsdb
 
 import (
+	"bytes"
+	"encoding/hex"
+	"time"
+
+	"github.com/dgraph-io/badger/v3/y"
+
 	"github.com/apache/skywalking-banyandb/api/common"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/pkg/encoding"
 	"github.com/apache/skywalking-banyandb/pkg/index"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 type Iterator interface {
@@ -33,6 +41,7 @@ type Iterator interface {
 
 type Item interface {
 	Family(family []byte) ([]byte, error)
+	PrintContext(l *logger.Logger, family []byte, n int)
 	Val() ([]byte, error)
 	ID() common.ItemID
 	SortedField() []byte
@@ -59,6 +68,7 @@ type seekerBuilder struct {
 	order               modelv1.Sort
 	indexRuleForSorting *databasev1.IndexRule
 	rangeOptsForSorting index.RangeOpts
+	l                   *logger.Logger
 }
 
 func (s *seekerBuilder) Build() (Seeker, error) {
@@ -75,6 +85,7 @@ func (s *seekerBuilder) Build() (Seeker, error) {
 func newSeekerBuilder(s *seriesSpan) SeekerBuilder {
 	return &seekerBuilder{
 		seriesSpan: s,
+		l:          logger.GetLogger("seeker-builder"),
 	}
 }
 
@@ -101,6 +112,7 @@ type item struct {
 	data        kv.TimeSeriesReader
 	seriesID    common.SeriesID
 	sortedField []byte
+	decoderPool encoding.SeriesDecoderPool
 }
 
 func (i *item) Time() uint64 {
@@ -119,6 +131,63 @@ func (i *item) Family(family []byte) ([]byte, error) {
 	return i.data.Get(d.marshal(), uint64(i.itemID))
 }
 
+func (i *item) PrintContext(l *logger.Logger, family []byte, n int) {
+	decoder := i.decoderPool.Get(family)
+	defer i.decoderPool.Put(decoder)
+	d := dataBucket{
+		seriesID: i.seriesID,
+		family:   family,
+	}
+	key := d.marshal()
+	pre, next := i.data.Context(key, uint64(i.itemID), n)
+	defer pre.Close()
+	defer next.Close()
+	j := 0
+	currentTS := uint64(i.itemID)
+
+	each := func(iter kv.Iterator) {
+		if !bytes.Equal(key, iter.Key()) {
+			return
+		}
+		j++
+
+		ts := y.ParseTs(iter.RawKey())
+
+		logEvent := l.Info().Int("i", j).
+			Time("ts", time.Unix(0, int64(y.ParseTs(iter.RawKey()))))
+		if err := decoder.Decode(family, iter.Val()); err != nil {
+			logEvent = logEvent.Str("loc", "mem")
+			if ts == currentTS {
+				logEvent = logEvent.Bool("at", true)
+			}
+		} else {
+			start, end := decoder.Range()
+			logEvent = logEvent.Time("start", time.Unix(0, int64(start))).
+				Time("end", time.Unix(0, int64(end))).Int("num", decoder.Len()).Str("loc", "table")
+			if start <= currentTS && currentTS <= end {
+				if dd, err := decoder.Get(currentTS); err == nil && len(dd) > 0 {
+					logEvent = logEvent.Bool("at", true)
+				}
+			}
+		}
+		logEvent.Send()
+	}
+
+	s := hex.EncodeToString(key)
+	if len(s) > 7 {
+		s = s[:7]
+	}
+	l.Info().Str("prefix", s).Time("ts", time.Unix(0, int64(i.itemID))).Msg("print previous lines")
+	for ; pre.Valid() && j < n; pre.Next() {
+		each(pre)
+	}
+	j = 0
+	l.Info().Str("prefix", s).Time("ts", time.Unix(0, int64(i.itemID))).Msg("print next lines")
+	for ; next.Valid() && j < n; next.Next() {
+		each(next)
+	}
+}
+
 func (i *item) Val() ([]byte, error) {
 	d := dataBucket{
 		seriesID: i.seriesID,
diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go
index 76bc194..910499e 100644
--- a/banyand/tsdb/series_seek_sort.go
+++ b/banyand/tsdb/series_seek_sort.go
@@ -28,6 +28,7 @@ import (
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/encoding"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -88,7 +89,8 @@ func (s *seekerBuilder) buildSeriesByIndex() (series []Iterator, err error) {
 			return nil, err
 		}
 		if inner != nil {
-			series = append(series, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, filters))
+			series = append(series, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), b.decoderPool(),
+				s.seriesSpan.seriesID, filters))
 		}
 	}
 	return
@@ -134,9 +136,11 @@ func (s *seekerBuilder) buildSeriesByTime() ([]Iterator, error) {
 				return nil, err
 			}
 			if filter == nil {
-				delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, emptyFilters))
+				delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), b.decoderPool(),
+					s.seriesSpan.seriesID, emptyFilters))
 			} else {
-				delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, []filterFn{filter}))
+				delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), b.decoderPool(),
+					s.seriesSpan.seriesID, []filterFn{filter}))
 			}
 		}
 	}
@@ -156,6 +160,7 @@ type searcherIterator struct {
 	curKey        []byte
 	cur           posting.Iterator
 	data          kv.TimeSeriesReader
+	decoderPool   encoding.SeriesDecoderPool
 	seriesID      common.SeriesID
 	filters       []filterFn
 	l             *logger.Logger
@@ -193,6 +198,7 @@ func (s *searcherIterator) Val() Item {
 		itemID:      s.cur.Current(),
 		data:        s.data,
 		seriesID:    s.seriesID,
+		decoderPool: s.decoderPool,
 	}
 }
 
@@ -201,7 +207,7 @@ func (s *searcherIterator) Close() error {
 }
 
 func newSearcherIterator(l *logger.Logger, fieldIterator index.FieldIterator, data kv.TimeSeriesReader,
-	seriesID common.SeriesID, filters []filterFn,
+	decoderPool encoding.SeriesDecoderPool, seriesID common.SeriesID, filters []filterFn,
 ) Iterator {
 	return &searcherIterator{
 		fieldIterator: fieldIterator,
@@ -209,6 +215,7 @@ func newSearcherIterator(l *logger.Logger, fieldIterator index.FieldIterator, da
 		seriesID:      seriesID,
 		filters:       filters,
 		l:             l,
+		decoderPool:   decoderPool,
 	}
 }
 
diff --git a/pkg/encoding/encoding.go b/pkg/encoding/encoding.go
index 0382ee2..efc1f6e 100644
--- a/pkg/encoding/encoding.go
+++ b/pkg/encoding/encoding.go
@@ -79,6 +79,8 @@ type SeriesDecoder interface {
 	Get(ts uint64) ([]byte, error)
 	// Iterator returns a SeriesIterator
 	Iterator() SeriesIterator
+	// Range returns the start and end time of this series
+	Range() (start, end uint64)
 }
 
 // SeriesIterator iterates time series data
diff --git a/pkg/encoding/int.go b/pkg/encoding/int.go
index c9e1991..c9a48a4 100644
--- a/pkg/encoding/int.go
+++ b/pkg/encoding/int.go
@@ -138,8 +138,10 @@ func (ie *intEncoder) Append(ts uint64, value []byte) {
 	if ie.startTime == 0 {
 		ie.startTime = ts
 		ie.prevTime = ts
+	} else if ie.startTime > ts {
+		ie.startTime = ts
 	}
-	gap := int(ts) - int(ie.prevTime)
+	gap := int(ie.prevTime) - int(ts)
 	if gap < 0 {
 		return
 	}
@@ -166,13 +168,15 @@ func (ie *intEncoder) Reset(key []byte) {
 	ie.interval = ie.fn(key)
 	ie.startTime = 0
 	ie.prevTime = 0
+	ie.num = 0
+	ie.values = NewXOREncoder(ie.bw)
 }
 
 func (ie *intEncoder) Encode() ([]byte, error) {
 	ie.bw.Flush()
 	buffWriter := buffer.NewBufferWriter(ie.buff)
 	buffWriter.PutUint64(ie.startTime)
-	buffWriter.PutUint16(uint16(ie.size))
+	buffWriter.PutUint16(uint16(ie.num))
 	bb := buffWriter.Bytes()
 	encodedSize.WithLabelValues(ie.name, "int").Add(float64(len(bb)))
 	return bb, nil
@@ -195,6 +199,9 @@ type intDecoder struct {
 }
 
 func (i *intDecoder) Decode(key, data []byte) error {
+	if len(data) < 10 {
+		return ErrInvalidValue
+	}
 	i.interval = i.fn(key)
 	i.startTime = binary.LittleEndian.Uint64(data[len(data)-10 : len(data)-2])
 	i.num = int(binary.LittleEndian.Uint16(data[len(data)-2:]))
@@ -219,29 +226,33 @@ func (i intDecoder) Get(ts uint64) ([]byte, error) {
 	return zeroBytes, nil
 }
 
+func (i intDecoder) Range() (start, end uint64) {
+	return i.startTime, i.startTime + uint64(i.num-1)*uint64(i.interval)
+}
+
 func (i intDecoder) Iterator() SeriesIterator {
 	br := bit.NewReader(bytes.NewReader(i.area))
 	return &intIterator{
-		startTime: i.startTime,
-		interval:  int(i.interval),
-		br:        br,
-		values:    NewXORDecoder(br),
-		size:      i.size,
+		endTime:  i.startTime + uint64(i.num*int(i.interval)),
+		interval: int(i.interval),
+		br:       br,
+		values:   NewXORDecoder(br),
+		size:     i.num,
 	}
 }
 
 var (
 	_         SeriesIterator = (*intIterator)(nil)
-	zeroBytes                = convert.Int64ToBytes(0)
-	Zero                     = convert.BytesToUint64(zeroBytes)
+	zeroBytes                = convert.Uint64ToBytes(zero)
+	zero                     = convert.BytesToUint64(convert.Int64ToBytes(0))
 )
 
 type intIterator struct {
-	startTime uint64
-	interval  int
-	size      int
-	br        *bit.Reader
-	values    *XORDecoder
+	endTime  uint64
+	interval int
+	size     int
+	br       *bit.Reader
+	values   *XORDecoder
 
 	currVal  uint64
 	currTime uint64
@@ -266,10 +277,10 @@ func (i *intIterator) Next() bool {
 			i.currVal = i.values.Value()
 		}
 	} else {
-		i.currVal = Zero
+		i.currVal = zero
 	}
-	i.currTime = i.startTime + uint64(i.interval*i.index)
 	i.index++
+	i.currTime = i.endTime - uint64(i.interval*i.index)
 	return true
 }
 
diff --git a/pkg/encoding/int_test.go b/pkg/encoding/int_test.go
index 657bbe5..1f08278 100644
--- a/pkg/encoding/int_test.go
+++ b/pkg/encoding/int_test.go
@@ -28,8 +28,10 @@ import (
 
 func TestNewIntEncoderAndDecoder(t *testing.T) {
 	type tsData struct {
-		ts   []uint64
-		data []int64
+		ts    []uint64
+		data  []int64
+		start uint64
+		end   uint64
 	}
 	tests := []struct {
 		name string
@@ -39,45 +41,53 @@ func TestNewIntEncoderAndDecoder(t *testing.T) {
 		{
 			name: "golden path",
 			args: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+				ts:   []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
 				data: []int64{7, 8, 7, 9},
 			},
 			want: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
-				data: []int64{7, 8, 7, 9},
+				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+				data:  []int64{7, 8, 7, 9},
+				start: uint64(time.Minute),
+				end:   uint64(4 * time.Minute),
 			},
 		},
 		{
 			name: "more than the size",
 			args: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(4 * time.Minute)},
+				ts:   []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)},
 				data: []int64{7, 8, 7, 9, 6},
 			},
 			want: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
-				data: []int64{7, 8, 7, 9},
+				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+				data:  []int64{7, 8, 7, 9},
+				start: uint64(time.Minute),
+				end:   uint64(4 * time.Minute),
 			},
 		},
 		{
 			name: "less than the size",
 			args: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
+				ts:   []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
 				data: []int64{7, 8, 7},
 			},
 			want: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
-				data: []int64{7, 8, 7},
+				ts:    []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+				data:  []int64{7, 8, 7},
+				start: uint64(time.Minute),
+				end:   uint64(3 * time.Minute),
 			},
 		},
 		{
 			name: "empty slot in the middle",
 			args: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(4 * time.Minute)},
+				ts:   []uint64{uint64(4 * time.Minute), uint64(time.Minute)},
 				data: []int64{7, 9},
 			},
 			want: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
-				data: []int64{7, 0, 0, 9},
+				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+				data:  []int64{7, 0, 0, 9},
+				start: uint64(time.Minute),
+				end:   uint64(4 * time.Minute),
 			},
 		},
 	}
@@ -93,36 +103,49 @@ func TestNewIntEncoderAndDecoder(t *testing.T) {
 		t.Run(tt.name, func(t *testing.T) {
 			at := assert.New(t)
 			encoder := encoderPool.Get(key)
+			defer encoderPool.Put(encoder)
 			decoder := decoderPool.Get(key)
+			defer decoderPool.Put(decoder)
 			encoder.Reset(key)
+			isFull := false
 			for i, v := range tt.args.ts {
 				encoder.Append(v, convert.Int64ToBytes(tt.args.data[i]))
 				if encoder.IsFull() {
+					isFull = true
 					break
 				}
 			}
 			bb, err := encoder.Encode()
 			at.NoError(err)
+
+			at.Equal(tt.want.start, encoder.StartTime())
 			at.NoError(decoder.Decode(key, bb))
-			at.True(decoder.IsFull())
-			iter := decoder.Iterator()
-			for i, t := range tt.want.ts {
-				at.True(iter.Next())
+			start, end := decoder.Range()
+			at.Equal(tt.want.start, start)
+			at.Equal(tt.want.end, end)
+			if isFull {
+				at.True(decoder.IsFull())
+			}
+			i := 0
+			for iter := decoder.Iterator(); iter.Next(); i++ {
 				at.NoError(iter.Error())
 				at.Equal(tt.want.ts[i], iter.Time())
 				at.Equal(tt.want.data[i], convert.BytesToInt64(iter.Val()))
-				v, err := decoder.Get(t)
+				v, err := decoder.Get(iter.Time())
 				at.NoError(err)
 				at.Equal(tt.want.data[i], convert.BytesToInt64(v))
 			}
+			at.Equal(len(tt.want.ts), i)
 		})
 	}
 }
 
 func TestNewIntDecoderGet(t *testing.T) {
 	type tsData struct {
-		ts   []uint64
-		data []int64
+		ts    []uint64
+		data  []int64
+		start uint64
+		end   uint64
 	}
 	tests := []struct {
 		name string
@@ -132,45 +155,53 @@ func TestNewIntDecoderGet(t *testing.T) {
 		{
 			name: "golden path",
 			args: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+				ts:   []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
 				data: []int64{7, 8, 7, 9},
 			},
 			want: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
-				data: []int64{7, 8, 7, 9},
+				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+				data:  []int64{7, 8, 7, 9},
+				start: uint64(time.Minute),
+				end:   uint64(4 * time.Minute),
 			},
 		},
 		{
 			name: "more than the size",
 			args: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(4 * time.Minute)},
+				ts:   []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)},
 				data: []int64{7, 8, 7, 9, 6},
 			},
 			want: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(5 * time.Minute)},
-				data: []int64{7, 8, 7, 9, 0},
+				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 0},
+				data:  []int64{7, 8, 7, 9, 0},
+				start: uint64(time.Minute),
+				end:   uint64(4 * time.Minute),
 			},
 		},
 		{
 			name: "less than the size",
 			args: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
+				ts:   []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
 				data: []int64{7, 8, 7},
 			},
 			want: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
-				data: []int64{7, 8, 7},
+				ts:    []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+				data:  []int64{7, 8, 7},
+				start: uint64(time.Minute),
+				end:   uint64(3 * time.Minute),
 			},
 		},
 		{
 			name: "empty slot in the middle",
 			args: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(4 * time.Minute)},
+				ts:   []uint64{uint64(4 * time.Minute), uint64(time.Minute)},
 				data: []int64{7, 9},
 			},
 			want: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
-				data: []int64{7, 0, 0, 9},
+				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+				data:  []int64{7, 0, 0, 9},
+				start: uint64(time.Minute),
+				end:   uint64(4 * time.Minute),
 			},
 		},
 	}
@@ -186,18 +217,29 @@ func TestNewIntDecoderGet(t *testing.T) {
 		t.Run(tt.name, func(t *testing.T) {
 			at := assert.New(t)
 			encoder := encoderPool.Get(key)
+			defer encoderPool.Put(encoder)
 			decoder := decoderPool.Get(key)
+			defer decoderPool.Put(decoder)
 			encoder.Reset(key)
+			isFull := false
 			for i, v := range tt.args.ts {
 				encoder.Append(v, convert.Int64ToBytes(tt.args.data[i]))
 				if encoder.IsFull() {
+					isFull = true
 					break
 				}
 			}
 			bb, err := encoder.Encode()
 			at.NoError(err)
+
+			at.Equal(tt.want.start, encoder.StartTime())
 			at.NoError(decoder.Decode(key, bb))
-			at.True(decoder.IsFull())
+			start, end := decoder.Range()
+			at.Equal(tt.want.start, start)
+			at.Equal(tt.want.end, end)
+			if isFull {
+				at.True(decoder.IsFull())
+			}
 			for i, t := range tt.want.ts {
 				v, err := decoder.Get(t)
 				at.NoError(err)
diff --git a/pkg/encoding/plain.go b/pkg/encoding/plain.go
index 2284273..388ee8c 100644
--- a/pkg/encoding/plain.go
+++ b/pkg/encoding/plain.go
@@ -202,6 +202,9 @@ func (t *plainDecoder) Len() int {
 }
 
 func (t *plainDecoder) Decode(_, rawData []byte) (err error) {
+	if len(rawData) < 2 {
+		return ErrInvalidValue
+	}
 	var data []byte
 	size := binary.LittleEndian.Uint16(rawData[len(rawData)-2:])
 	if data, err = zstdDecoder.DecodeAll(rawData[:len(rawData)-2], make([]byte, 0, size)); err != nil {
@@ -242,6 +245,12 @@ func (t *plainDecoder) Get(ts uint64) ([]byte, error) {
 	return getVal(t.val, parseOffset(slot))
 }
 
+func (t *plainDecoder) Range() (start, end uint64) {
+	startSlot := getTSSlot(t.ts, int(t.num)-1)
+	endSlot := getTSSlot(t.ts, 0)
+	return parseTS(startSlot), parseTS(endSlot)
+}
+
 func (t *plainDecoder) Iterator() SeriesIterator {
 	return newBlockItemIterator(t)
 }
diff --git a/pkg/index/iterator.go b/pkg/index/iterator.go
index fe94b44..8259d7d 100644
--- a/pkg/index/iterator.go
+++ b/pkg/index/iterator.go
@@ -266,6 +266,10 @@ func (di *delegateIterator) Key() []byte {
 	return di.delegated.Key()
 }
 
+func (di *delegateIterator) RawKey() []byte {
+	return di.delegated.RawKey()
+}
+
 func (di *delegateIterator) Field() Field {
 	return di.curField
 }
diff --git a/pkg/pb/v1/write.go b/pkg/pb/v1/write.go
index f56a68b..3d22f92 100644
--- a/pkg/pb/v1/write.go
+++ b/pkg/pb/v1/write.go
@@ -32,14 +32,13 @@ import (
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 	streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
-	"github.com/apache/skywalking-banyandb/pkg/encoding"
 )
 
 type ID string
 
 const fieldFlagLength = 9
 
-var zeroFieldValue = &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: int64(encoding.Zero)}}}
+var zeroFieldValue = &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 0}}}
 
 var (
 	strDelimiter = []byte("\n")
diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index d7722d4..3640595 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -153,7 +153,7 @@ func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (executor.
 		projectionTagsRefs:   i.projectionTagsRefs,
 		projectionFieldsRefs: i.projectionFieldsRefs,
 	}
-	if len(iters) == 1 || i.groupByEntity {
+	if i.groupByEntity {
 		return newSeriesMIterator(iters, transformContext), nil
 	}
 	c := logical.CreateComparator(i.Sort)