You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/11/21 05:23:08 UTC

[skywalking-banyandb] 01/01: Fix some flaws in kv

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch kv
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit c99420652e256111d0e5cdcf9385fcc4130c7a42
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Nov 21 05:20:09 2022 +0000

    Fix some flaws in kv
    
    * Correct int encoding disorder
    * Add a print context helper for debugging encoding issues
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/kv/badger.go                               |  23 +++++
 banyand/kv/kv.go                                   |   2 +
 banyand/measure/measure_query.go                   |  12 ++-
 banyand/tsdb/block.go                              |   5 +
 banyand/tsdb/series.go                             |   7 +-
 banyand/tsdb/series_seek.go                        |  69 +++++++++++++
 banyand/tsdb/series_seek_sort.go                   |  15 ++-
 pkg/encoding/encoding.go                           |   2 +
 pkg/encoding/int.go                                |  43 +++++---
 pkg/encoding/int_test.go                           | 110 ++++++++++++++-------
 pkg/encoding/plain.go                              |   9 ++
 pkg/index/iterator.go                              |   4 +
 pkg/pb/v1/write.go                                 |   3 +-
 .../measure/measure_plan_indexscan_local.go        |   2 +-
 14 files changed, 244 insertions(+), 62 deletions(-)

diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 4482721..9ef467b 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -48,6 +48,25 @@ type badgerTSS struct {
 	badger.TSet
 }
 
+func (b *badgerTSS) Context(key []byte, ts uint64, n int) (pre Iterator, next Iterator) {
+	preOpts := badger.DefaultIteratorOptions
+	preOpts.PrefetchSize = n
+	preOpts.PrefetchValues = false
+	preOpts.Prefix = key
+	preOpts.Reverse = false
+	nextOpts := badger.DefaultIteratorOptions
+	nextOpts.PrefetchSize = n
+	nextOpts.PrefetchValues = false
+	nextOpts.Prefix = key
+	nextOpts.Reverse = true
+	seekKey := y.KeyWithTs(key, ts)
+	preIter := b.db.NewIterator(preOpts)
+	preIter.Seek(seekKey)
+	nextIter := b.db.NewIterator(nextOpts)
+	nextIter.Seek(seekKey)
+	return &iterator{delegated: preIter}, &iterator{delegated: nextIter, reverse: true}
+}
+
 func (b *badgerTSS) Stats() (s observability.Statistics) {
 	return badgerStats(b.db)
 }
@@ -190,6 +209,10 @@ func (i *iterator) Key() []byte {
 	return y.ParseKey(i.delegated.Key())
 }
 
+func (i *iterator) RawKey() []byte {
+	return i.delegated.Key()
+}
+
 func (i *iterator) Val() []byte {
 	return y.Copy(i.delegated.Value().Value)
 }
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 90944a2..5dd9e53 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -80,6 +80,7 @@ type TimeSeriesWriter interface {
 type TimeSeriesReader interface {
 	// Get a value by its key and timestamp/version
 	Get(key []byte, ts uint64) ([]byte, error)
+	Context(key []byte, ts uint64, n int) (pre, next Iterator)
 }
 
 // TimeSeriesStore is time series storage
@@ -140,6 +141,7 @@ type Iterator interface {
 	Rewind()
 	Seek(key []byte)
 	Key() []byte
+	RawKey() []byte
 	Val() []byte
 	Valid() bool
 	Close() error
diff --git a/banyand/measure/measure_query.go b/banyand/measure/measure_query.go
index 89f1b13..4581f3f 100644
--- a/banyand/measure/measure_query.go
+++ b/banyand/measure/measure_query.go
@@ -114,10 +114,14 @@ func (s *measure) Shard(id common.ShardID) (tsdb.Shard, error) {
 }
 
 func (s *measure) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) {
-	familyRawBytes, err := item.Family(familyIdentity(family, pbv1.TagFlag))
+	fid := familyIdentity(family, pbv1.TagFlag)
+	familyRawBytes, err := item.Family(fid)
 	if err != nil {
 		return nil, errors.Wrapf(err, "measure %s.%s parse family %s", s.name, s.group, family)
 	}
+	if len(familyRawBytes) < 1 {
+		item.PrintContext(s.l.Named("tag-family"), fid, 10)
+	}
 	tagFamily := &modelv1.TagFamilyForWrite{}
 	err = proto.Unmarshal(familyRawBytes, tagFamily)
 	if err != nil {
@@ -155,10 +159,14 @@ func (s *measure) ParseField(name string, item tsdb.Item) (*measurev1.DataPoint_
 			break
 		}
 	}
-	bytes, err := item.Family(familyIdentity(name, pbv1.EncoderFieldFlag(fieldSpec, s.interval)))
+	fid := familyIdentity(name, pbv1.EncoderFieldFlag(fieldSpec, s.interval))
+	bytes, err := item.Family(fid)
 	if err != nil {
 		return nil, err
 	}
+	if len(bytes) < 1 {
+		item.PrintContext(s.l.Named("field"), fid, 10)
+	}
 	fieldValue, err := pbv1.DecodeFieldValue(bytes, fieldSpec)
 	if err != nil {
 		return nil, err
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 5ba3331..bdfde12 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -322,6 +322,7 @@ type BlockDelegate interface {
 	writeLSMIndex(fields []index.Field, id common.ItemID) error
 	writeInvertedIndex(fields []index.Field, id common.ItemID) error
 	dataReader() kv.TimeSeriesReader
+	decoderPool() encoding.SeriesDecoderPool
 	lsmIndexReader() index.Searcher
 	invertedIndexReader() index.Searcher
 	primaryIndexReader() index.FieldIterable
@@ -336,6 +337,10 @@ type bDelegate struct {
 	delegate *block
 }
 
+func (d *bDelegate) decoderPool() encoding.SeriesDecoderPool {
+	return d.delegate.encodingMethod.DecoderPool
+}
+
 func (d *bDelegate) dataReader() kv.TimeSeriesReader {
 	return d.delegate.store
 }
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index 09d62aa..4d44f30 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -104,9 +104,10 @@ func (s *series) Get(ctx context.Context, id GlobalItemID) (Item, io.Closer, err
 		return nil, nil, errors.WithMessagef(ErrBlockAbsent, "id: %v", id)
 	}
 	return &item{
-		data:     b.dataReader(),
-		itemID:   id.ID,
-		seriesID: s.id,
+		data:        b.dataReader(),
+		itemID:      id.ID,
+		seriesID:    s.id,
+		decoderPool: b.decoderPool(),
 	}, b, nil
 }
 
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index b3f08e1..18db6e2 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -18,11 +18,19 @@
 package tsdb
 
 import (
+	"bytes"
+	"encoding/hex"
+	"time"
+
+	"github.com/dgraph-io/badger/v3/y"
+
 	"github.com/apache/skywalking-banyandb/api/common"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/pkg/encoding"
 	"github.com/apache/skywalking-banyandb/pkg/index"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 type Iterator interface {
@@ -33,6 +41,7 @@ type Iterator interface {
 
 type Item interface {
 	Family(family []byte) ([]byte, error)
+	PrintContext(l *logger.Logger, family []byte, n int)
 	Val() ([]byte, error)
 	ID() common.ItemID
 	SortedField() []byte
@@ -59,6 +68,7 @@ type seekerBuilder struct {
 	order               modelv1.Sort
 	indexRuleForSorting *databasev1.IndexRule
 	rangeOptsForSorting index.RangeOpts
+	l                   *logger.Logger
 }
 
 func (s *seekerBuilder) Build() (Seeker, error) {
@@ -75,6 +85,7 @@ func (s *seekerBuilder) Build() (Seeker, error) {
 func newSeekerBuilder(s *seriesSpan) SeekerBuilder {
 	return &seekerBuilder{
 		seriesSpan: s,
+		l:          logger.GetLogger("seeker-builder"),
 	}
 }
 
@@ -101,6 +112,7 @@ type item struct {
 	data        kv.TimeSeriesReader
 	seriesID    common.SeriesID
 	sortedField []byte
+	decoderPool encoding.SeriesDecoderPool
 }
 
 func (i *item) Time() uint64 {
@@ -119,6 +131,63 @@ func (i *item) Family(family []byte) ([]byte, error) {
 	return i.data.Get(d.marshal(), uint64(i.itemID))
 }
 
+func (i *item) PrintContext(l *logger.Logger, family []byte, n int) {
+	decoder := i.decoderPool.Get(family)
+	defer i.decoderPool.Put(decoder)
+	d := dataBucket{
+		seriesID: i.seriesID,
+		family:   family,
+	}
+	key := d.marshal()
+	pre, next := i.data.Context(key, uint64(i.itemID), n)
+	defer pre.Close()
+	defer next.Close()
+	j := 0
+	currentTS := uint64(i.itemID)
+
+	each := func(iter kv.Iterator) {
+		if !bytes.Equal(key, iter.Key()) {
+			return
+		}
+		j++
+
+		ts := y.ParseTs(iter.RawKey())
+
+		logEvent := l.Info().Int("i", j).
+			Time("ts", time.Unix(0, int64(y.ParseTs(iter.RawKey()))))
+		if err := decoder.Decode(family, iter.Val()); err != nil {
+			logEvent = logEvent.Str("loc", "mem")
+			if ts == currentTS {
+				logEvent = logEvent.Bool("at", true)
+			}
+		} else {
+			start, end := decoder.Range()
+			logEvent = logEvent.Time("start", time.Unix(0, int64(start))).
+				Time("end", time.Unix(0, int64(end))).Int("num", decoder.Len()).Str("loc", "table")
+			if start <= currentTS && currentTS <= end {
+				if dd, err := decoder.Get(currentTS); err == nil && len(dd) > 0 {
+					logEvent = logEvent.Bool("at", true)
+				}
+			}
+		}
+		logEvent.Send()
+	}
+
+	s := hex.EncodeToString(key)
+	if len(s) > 7 {
+		s = s[:7]
+	}
+	l.Info().Str("prefix", s).Time("ts", time.Unix(0, int64(i.itemID))).Msg("print previous lines")
+	for ; pre.Valid() && j < n; pre.Next() {
+		each(pre)
+	}
+	j = 0
+	l.Info().Str("prefix", s).Time("ts", time.Unix(0, int64(i.itemID))).Msg("print next lines")
+	for ; next.Valid() && j < n; next.Next() {
+		each(next)
+	}
+}
+
 func (i *item) Val() ([]byte, error) {
 	d := dataBucket{
 		seriesID: i.seriesID,
diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go
index 76bc194..910499e 100644
--- a/banyand/tsdb/series_seek_sort.go
+++ b/banyand/tsdb/series_seek_sort.go
@@ -28,6 +28,7 @@ import (
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/encoding"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -88,7 +89,8 @@ func (s *seekerBuilder) buildSeriesByIndex() (series []Iterator, err error) {
 			return nil, err
 		}
 		if inner != nil {
-			series = append(series, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, filters))
+			series = append(series, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), b.decoderPool(),
+				s.seriesSpan.seriesID, filters))
 		}
 	}
 	return
@@ -134,9 +136,11 @@ func (s *seekerBuilder) buildSeriesByTime() ([]Iterator, error) {
 				return nil, err
 			}
 			if filter == nil {
-				delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, emptyFilters))
+				delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), b.decoderPool(),
+					s.seriesSpan.seriesID, emptyFilters))
 			} else {
-				delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, []filterFn{filter}))
+				delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), b.decoderPool(),
+					s.seriesSpan.seriesID, []filterFn{filter}))
 			}
 		}
 	}
@@ -156,6 +160,7 @@ type searcherIterator struct {
 	curKey        []byte
 	cur           posting.Iterator
 	data          kv.TimeSeriesReader
+	decoderPool   encoding.SeriesDecoderPool
 	seriesID      common.SeriesID
 	filters       []filterFn
 	l             *logger.Logger
@@ -193,6 +198,7 @@ func (s *searcherIterator) Val() Item {
 		itemID:      s.cur.Current(),
 		data:        s.data,
 		seriesID:    s.seriesID,
+		decoderPool: s.decoderPool,
 	}
 }
 
@@ -201,7 +207,7 @@ func (s *searcherIterator) Close() error {
 }
 
 func newSearcherIterator(l *logger.Logger, fieldIterator index.FieldIterator, data kv.TimeSeriesReader,
-	seriesID common.SeriesID, filters []filterFn,
+	decoderPool encoding.SeriesDecoderPool, seriesID common.SeriesID, filters []filterFn,
 ) Iterator {
 	return &searcherIterator{
 		fieldIterator: fieldIterator,
@@ -209,6 +215,7 @@ func newSearcherIterator(l *logger.Logger, fieldIterator index.FieldIterator, da
 		seriesID:      seriesID,
 		filters:       filters,
 		l:             l,
+		decoderPool:   decoderPool,
 	}
 }
 
diff --git a/pkg/encoding/encoding.go b/pkg/encoding/encoding.go
index 0382ee2..efc1f6e 100644
--- a/pkg/encoding/encoding.go
+++ b/pkg/encoding/encoding.go
@@ -79,6 +79,8 @@ type SeriesDecoder interface {
 	Get(ts uint64) ([]byte, error)
 	// Iterator returns a SeriesIterator
 	Iterator() SeriesIterator
+	// Range returns the start and end time of this series
+	Range() (start, end uint64)
 }
 
 // SeriesIterator iterates time series data
diff --git a/pkg/encoding/int.go b/pkg/encoding/int.go
index c9e1991..c9a48a4 100644
--- a/pkg/encoding/int.go
+++ b/pkg/encoding/int.go
@@ -138,8 +138,10 @@ func (ie *intEncoder) Append(ts uint64, value []byte) {
 	if ie.startTime == 0 {
 		ie.startTime = ts
 		ie.prevTime = ts
+	} else if ie.startTime > ts {
+		ie.startTime = ts
 	}
-	gap := int(ts) - int(ie.prevTime)
+	gap := int(ie.prevTime) - int(ts)
 	if gap < 0 {
 		return
 	}
@@ -166,13 +168,15 @@ func (ie *intEncoder) Reset(key []byte) {
 	ie.interval = ie.fn(key)
 	ie.startTime = 0
 	ie.prevTime = 0
+	ie.num = 0
+	ie.values = NewXOREncoder(ie.bw)
 }
 
 func (ie *intEncoder) Encode() ([]byte, error) {
 	ie.bw.Flush()
 	buffWriter := buffer.NewBufferWriter(ie.buff)
 	buffWriter.PutUint64(ie.startTime)
-	buffWriter.PutUint16(uint16(ie.size))
+	buffWriter.PutUint16(uint16(ie.num))
 	bb := buffWriter.Bytes()
 	encodedSize.WithLabelValues(ie.name, "int").Add(float64(len(bb)))
 	return bb, nil
@@ -195,6 +199,9 @@ type intDecoder struct {
 }
 
 func (i *intDecoder) Decode(key, data []byte) error {
+	if len(data) < 10 {
+		return ErrInvalidValue
+	}
 	i.interval = i.fn(key)
 	i.startTime = binary.LittleEndian.Uint64(data[len(data)-10 : len(data)-2])
 	i.num = int(binary.LittleEndian.Uint16(data[len(data)-2:]))
@@ -219,29 +226,33 @@ func (i intDecoder) Get(ts uint64) ([]byte, error) {
 	return zeroBytes, nil
 }
 
+func (i intDecoder) Range() (start, end uint64) {
+	return i.startTime, i.startTime + uint64(i.num-1)*uint64(i.interval)
+}
+
 func (i intDecoder) Iterator() SeriesIterator {
 	br := bit.NewReader(bytes.NewReader(i.area))
 	return &intIterator{
-		startTime: i.startTime,
-		interval:  int(i.interval),
-		br:        br,
-		values:    NewXORDecoder(br),
-		size:      i.size,
+		endTime:  i.startTime + uint64(i.num*int(i.interval)),
+		interval: int(i.interval),
+		br:       br,
+		values:   NewXORDecoder(br),
+		size:     i.num,
 	}
 }
 
 var (
 	_         SeriesIterator = (*intIterator)(nil)
-	zeroBytes                = convert.Int64ToBytes(0)
-	Zero                     = convert.BytesToUint64(zeroBytes)
+	zeroBytes                = convert.Uint64ToBytes(zero)
+	zero                     = convert.BytesToUint64(convert.Int64ToBytes(0))
 )
 
 type intIterator struct {
-	startTime uint64
-	interval  int
-	size      int
-	br        *bit.Reader
-	values    *XORDecoder
+	endTime  uint64
+	interval int
+	size     int
+	br       *bit.Reader
+	values   *XORDecoder
 
 	currVal  uint64
 	currTime uint64
@@ -266,10 +277,10 @@ func (i *intIterator) Next() bool {
 			i.currVal = i.values.Value()
 		}
 	} else {
-		i.currVal = Zero
+		i.currVal = zero
 	}
-	i.currTime = i.startTime + uint64(i.interval*i.index)
 	i.index++
+	i.currTime = i.endTime - uint64(i.interval*i.index)
 	return true
 }
 
diff --git a/pkg/encoding/int_test.go b/pkg/encoding/int_test.go
index 657bbe5..1f08278 100644
--- a/pkg/encoding/int_test.go
+++ b/pkg/encoding/int_test.go
@@ -28,8 +28,10 @@ import (
 
 func TestNewIntEncoderAndDecoder(t *testing.T) {
 	type tsData struct {
-		ts   []uint64
-		data []int64
+		ts    []uint64
+		data  []int64
+		start uint64
+		end   uint64
 	}
 	tests := []struct {
 		name string
@@ -39,45 +41,53 @@ func TestNewIntEncoderAndDecoder(t *testing.T) {
 		{
 			name: "golden path",
 			args: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+				ts:   []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
 				data: []int64{7, 8, 7, 9},
 			},
 			want: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
-				data: []int64{7, 8, 7, 9},
+				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+				data:  []int64{7, 8, 7, 9},
+				start: uint64(time.Minute),
+				end:   uint64(4 * time.Minute),
 			},
 		},
 		{
 			name: "more than the size",
 			args: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(4 * time.Minute)},
+				ts:   []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)},
 				data: []int64{7, 8, 7, 9, 6},
 			},
 			want: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
-				data: []int64{7, 8, 7, 9},
+				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+				data:  []int64{7, 8, 7, 9},
+				start: uint64(time.Minute),
+				end:   uint64(4 * time.Minute),
 			},
 		},
 		{
 			name: "less than the size",
 			args: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
+				ts:   []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
 				data: []int64{7, 8, 7},
 			},
 			want: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
-				data: []int64{7, 8, 7},
+				ts:    []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+				data:  []int64{7, 8, 7},
+				start: uint64(time.Minute),
+				end:   uint64(3 * time.Minute),
 			},
 		},
 		{
 			name: "empty slot in the middle",
 			args: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(4 * time.Minute)},
+				ts:   []uint64{uint64(4 * time.Minute), uint64(time.Minute)},
 				data: []int64{7, 9},
 			},
 			want: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
-				data: []int64{7, 0, 0, 9},
+				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+				data:  []int64{7, 0, 0, 9},
+				start: uint64(time.Minute),
+				end:   uint64(4 * time.Minute),
 			},
 		},
 	}
@@ -93,36 +103,49 @@ func TestNewIntEncoderAndDecoder(t *testing.T) {
 		t.Run(tt.name, func(t *testing.T) {
 			at := assert.New(t)
 			encoder := encoderPool.Get(key)
+			defer encoderPool.Put(encoder)
 			decoder := decoderPool.Get(key)
+			defer decoderPool.Put(decoder)
 			encoder.Reset(key)
+			isFull := false
 			for i, v := range tt.args.ts {
 				encoder.Append(v, convert.Int64ToBytes(tt.args.data[i]))
 				if encoder.IsFull() {
+					isFull = true
 					break
 				}
 			}
 			bb, err := encoder.Encode()
 			at.NoError(err)
+
+			at.Equal(tt.want.start, encoder.StartTime())
 			at.NoError(decoder.Decode(key, bb))
-			at.True(decoder.IsFull())
-			iter := decoder.Iterator()
-			for i, t := range tt.want.ts {
-				at.True(iter.Next())
+			start, end := decoder.Range()
+			at.Equal(tt.want.start, start)
+			at.Equal(tt.want.end, end)
+			if isFull {
+				at.True(decoder.IsFull())
+			}
+			i := 0
+			for iter := decoder.Iterator(); iter.Next(); i++ {
 				at.NoError(iter.Error())
 				at.Equal(tt.want.ts[i], iter.Time())
 				at.Equal(tt.want.data[i], convert.BytesToInt64(iter.Val()))
-				v, err := decoder.Get(t)
+				v, err := decoder.Get(iter.Time())
 				at.NoError(err)
 				at.Equal(tt.want.data[i], convert.BytesToInt64(v))
 			}
+			at.Equal(len(tt.want.ts), i)
 		})
 	}
 }
 
 func TestNewIntDecoderGet(t *testing.T) {
 	type tsData struct {
-		ts   []uint64
-		data []int64
+		ts    []uint64
+		data  []int64
+		start uint64
+		end   uint64
 	}
 	tests := []struct {
 		name string
@@ -132,45 +155,53 @@ func TestNewIntDecoderGet(t *testing.T) {
 		{
 			name: "golden path",
 			args: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+				ts:   []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
 				data: []int64{7, 8, 7, 9},
 			},
 			want: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
-				data: []int64{7, 8, 7, 9},
+				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+				data:  []int64{7, 8, 7, 9},
+				start: uint64(time.Minute),
+				end:   uint64(4 * time.Minute),
 			},
 		},
 		{
 			name: "more than the size",
 			args: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(4 * time.Minute)},
+				ts:   []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)},
 				data: []int64{7, 8, 7, 9, 6},
 			},
 			want: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(5 * time.Minute)},
-				data: []int64{7, 8, 7, 9, 0},
+				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 0},
+				data:  []int64{7, 8, 7, 9, 0},
+				start: uint64(time.Minute),
+				end:   uint64(4 * time.Minute),
 			},
 		},
 		{
 			name: "less than the size",
 			args: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
+				ts:   []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
 				data: []int64{7, 8, 7},
 			},
 			want: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
-				data: []int64{7, 8, 7},
+				ts:    []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+				data:  []int64{7, 8, 7},
+				start: uint64(time.Minute),
+				end:   uint64(3 * time.Minute),
 			},
 		},
 		{
 			name: "empty slot in the middle",
 			args: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(4 * time.Minute)},
+				ts:   []uint64{uint64(4 * time.Minute), uint64(time.Minute)},
 				data: []int64{7, 9},
 			},
 			want: tsData{
-				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
-				data: []int64{7, 0, 0, 9},
+				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+				data:  []int64{7, 0, 0, 9},
+				start: uint64(time.Minute),
+				end:   uint64(4 * time.Minute),
 			},
 		},
 	}
@@ -186,18 +217,29 @@ func TestNewIntDecoderGet(t *testing.T) {
 		t.Run(tt.name, func(t *testing.T) {
 			at := assert.New(t)
 			encoder := encoderPool.Get(key)
+			defer encoderPool.Put(encoder)
 			decoder := decoderPool.Get(key)
+			defer decoderPool.Put(decoder)
 			encoder.Reset(key)
+			isFull := false
 			for i, v := range tt.args.ts {
 				encoder.Append(v, convert.Int64ToBytes(tt.args.data[i]))
 				if encoder.IsFull() {
+					isFull = true
 					break
 				}
 			}
 			bb, err := encoder.Encode()
 			at.NoError(err)
+
+			at.Equal(tt.want.start, encoder.StartTime())
 			at.NoError(decoder.Decode(key, bb))
-			at.True(decoder.IsFull())
+			start, end := decoder.Range()
+			at.Equal(tt.want.start, start)
+			at.Equal(tt.want.end, end)
+			if isFull {
+				at.True(decoder.IsFull())
+			}
 			for i, t := range tt.want.ts {
 				v, err := decoder.Get(t)
 				at.NoError(err)
diff --git a/pkg/encoding/plain.go b/pkg/encoding/plain.go
index 2284273..388ee8c 100644
--- a/pkg/encoding/plain.go
+++ b/pkg/encoding/plain.go
@@ -202,6 +202,9 @@ func (t *plainDecoder) Len() int {
 }
 
 func (t *plainDecoder) Decode(_, rawData []byte) (err error) {
+	if len(rawData) < 2 {
+		return ErrInvalidValue
+	}
 	var data []byte
 	size := binary.LittleEndian.Uint16(rawData[len(rawData)-2:])
 	if data, err = zstdDecoder.DecodeAll(rawData[:len(rawData)-2], make([]byte, 0, size)); err != nil {
@@ -242,6 +245,12 @@ func (t *plainDecoder) Get(ts uint64) ([]byte, error) {
 	return getVal(t.val, parseOffset(slot))
 }
 
+func (t *plainDecoder) Range() (start, end uint64) {
+	startSlot := getTSSlot(t.ts, int(t.num)-1)
+	endSlot := getTSSlot(t.ts, 0)
+	return parseTS(startSlot), parseTS(endSlot)
+}
+
 func (t *plainDecoder) Iterator() SeriesIterator {
 	return newBlockItemIterator(t)
 }
diff --git a/pkg/index/iterator.go b/pkg/index/iterator.go
index fe94b44..8259d7d 100644
--- a/pkg/index/iterator.go
+++ b/pkg/index/iterator.go
@@ -266,6 +266,10 @@ func (di *delegateIterator) Key() []byte {
 	return di.delegated.Key()
 }
 
+func (di *delegateIterator) RawKey() []byte {
+	return di.delegated.RawKey()
+}
+
 func (di *delegateIterator) Field() Field {
 	return di.curField
 }
diff --git a/pkg/pb/v1/write.go b/pkg/pb/v1/write.go
index f56a68b..3d22f92 100644
--- a/pkg/pb/v1/write.go
+++ b/pkg/pb/v1/write.go
@@ -32,14 +32,13 @@ import (
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 	streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
-	"github.com/apache/skywalking-banyandb/pkg/encoding"
 )
 
 type ID string
 
 const fieldFlagLength = 9
 
-var zeroFieldValue = &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: int64(encoding.Zero)}}}
+var zeroFieldValue = &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 0}}}
 
 var (
 	strDelimiter = []byte("\n")
diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index d7722d4..3640595 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -153,7 +153,7 @@ func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (executor.
 		projectionTagsRefs:   i.projectionTagsRefs,
 		projectionFieldsRefs: i.projectionFieldsRefs,
 	}
-	if len(iters) == 1 || i.groupByEntity {
+	if i.groupByEntity {
 		return newSeriesMIterator(iters, transformContext), nil
 	}
 	c := logical.CreateComparator(i.Sort)