You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2023/01/14 04:43:32 UTC

[skywalking-banyandb] branch main updated: Support 64-bit float field (#240)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 5308398  Support 64-bit float field (#240)
5308398 is described below

commit 530839850da6ce4e2735ededbf7b13fb9fa389ba
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sat Jan 14 12:43:26 2023 +0800

    Support 64-bit float field (#240)
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 api/proto/banyandb/database/v1/schema.proto        |   1 +
 api/proto/banyandb/model/v1/common.proto           |   5 +
 banyand/measure/encode.go                          |   4 +-
 banyand/measure/measure_suite_test.go              |   4 +-
 banyand/measure/measure_write.go                   |   2 +
 banyand/measure/metadata_test.go                   |   6 +-
 banyand/query/processor_topn.go                    |   4 +-
 docs/api-reference.md                              |  18 +
 pkg/convert/number.go                              |  13 +
 pkg/encoding/{int.go => encoder.go}                |  94 +++--
 pkg/encoding/encoder_test.go                       | 392 +++++++++++++++++++++
 pkg/encoding/int_test.go                           | 251 -------------
 pkg/pb/v1/metadata.go                              |   6 +-
 pkg/pb/v1/write.go                                 |   9 +
 pkg/query/aggregation/aggregation.go               |  93 ++++-
 pkg/query/aggregation/function.go                  |  85 ++---
 .../logical/measure/measure_plan_aggregation.go    | 144 +++++---
 pkg/schema/metadata.go                             |   2 +
 .../testdata/measures/instance_clr_cpu_minute.json |  50 +++
 test/cases/measure/data/input/float.yaml           |  26 ++
 test/cases/measure/data/input/float_agg_min.yaml   |  29 ++
 .../testdata/instance_clr_cpu_minute_data.json     | 107 ++++++
 test/cases/measure/data/want/float.yaml            |  93 +++++
 test/cases/measure/data/want/float_agg_min.yaml    |  34 ++
 test/cases/measure/measure.go                      |   2 +
 test/integration/cold_query/query_suite_test.go    |  29 +-
 test/integration/query/query_suite_test.go         |  29 +-
 27 files changed, 1074 insertions(+), 458 deletions(-)

diff --git a/api/proto/banyandb/database/v1/schema.proto b/api/proto/banyandb/database/v1/schema.proto
index 90555ad..929c77b 100644
--- a/api/proto/banyandb/database/v1/schema.proto
+++ b/api/proto/banyandb/database/v1/schema.proto
@@ -72,6 +72,7 @@ enum FieldType {
   FIELD_TYPE_STRING = 1;
   FIELD_TYPE_INT = 2;
   FIELD_TYPE_DATA_BINARY = 3;
+  FIELD_TYPE_FLOAT = 4;
 }
 
 enum EncodingMethod {
diff --git a/api/proto/banyandb/model/v1/common.proto b/api/proto/banyandb/model/v1/common.proto
index d3cf5e6..806eb0a 100644
--- a/api/proto/banyandb/model/v1/common.proto
+++ b/api/proto/banyandb/model/v1/common.proto
@@ -36,6 +36,10 @@ message Int {
   int64 value = 1;
 }
 
+message Float {
+  double value = 1;
+}
+
 message StrArray {
   repeated string value = 1;
 }
@@ -66,6 +70,7 @@ message FieldValue {
     model.v1.Str str = 2;
     model.v1.Int int = 3;
     bytes binary_data = 4;
+    model.v1.Float float = 5;
   }
 }
 
diff --git a/banyand/measure/encode.go b/banyand/measure/encode.go
index d446828..452fbf9 100644
--- a/banyand/measure/encode.go
+++ b/banyand/measure/encode.go
@@ -45,7 +45,7 @@ type encoderPool struct {
 
 func newEncoderPool(name string, size int, l *logger.Logger) encoding.SeriesEncoderPool {
 	return &encoderPool{
-		intPool: encoding.NewIntEncoderPool(name, size, intervalFn),
+		intPool: encoding.NewEncoderPool(name, size, intervalFn),
 		l:       l,
 	}
 }
@@ -72,7 +72,7 @@ type decoderPool struct {
 
 func newDecoderPool(name string, size int, l *logger.Logger) encoding.SeriesDecoderPool {
 	return &decoderPool{
-		intPool: encoding.NewIntDecoderPool(name, size, intervalFn),
+		intPool: encoding.NewDecoderPool(name, size, intervalFn),
 		l:       l,
 	}
 }
diff --git a/banyand/measure/measure_suite_test.go b/banyand/measure/measure_suite_test.go
index 8e33370..05f53a8 100644
--- a/banyand/measure/measure_suite_test.go
+++ b/banyand/measure/measure_suite_test.go
@@ -75,8 +75,8 @@ func setUp() (*services, func()) {
 	repo := discovery.NewMockServiceRepo(ctrl)
 	repo.EXPECT().NodeID().AnyTimes()
 	// Both PreRun and Serve phases send events
-	repo.EXPECT().Publish(event.MeasureTopicEntityEvent, test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).Times(2 * 8)
-	repo.EXPECT().Publish(event.MeasureTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).Times(2 * 2)
+	repo.EXPECT().Publish(event.MeasureTopicEntityEvent, test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).AnyTimes()
+	repo.EXPECT().Publish(event.MeasureTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).AnyTimes()
 
 	// Init Pipeline
 	pipeline, err := queue.NewQueue(context.TODO(), repo)
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index a29c535..b91d5a0 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -177,6 +177,8 @@ func encodeFieldValue(fieldValue *modelv1.FieldValue) []byte {
 	switch fieldValue.GetValue().(type) {
 	case *modelv1.FieldValue_Int:
 		return convert.Int64ToBytes(fieldValue.GetInt().GetValue())
+	case *modelv1.FieldValue_Float:
+		return convert.Float64ToBytes(fieldValue.GetFloat().GetValue())
 	case *modelv1.FieldValue_Str:
 		return []byte(fieldValue.GetStr().Value)
 	case *modelv1.FieldValue_BinaryData:
diff --git a/banyand/measure/metadata_test.go b/banyand/measure/metadata_test.go
index 9236c72..66c83c0 100644
--- a/banyand/measure/metadata_test.go
+++ b/banyand/measure/metadata_test.go
@@ -65,8 +65,8 @@ var _ = Describe("Metadata", func() {
 		})
 
 		It("should add shards", func() {
-			svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).Times(2)
-			svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).Times(4)
+			svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).AnyTimes()
+			svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).AnyTimes()
 			groupSchema, err := svcs.metadataService.GroupRegistry().GetGroup(context.TODO(), "sw_metric")
 			Expect(err).ShouldNot(HaveOccurred())
 			Expect(groupSchema).ShouldNot(BeNil())
@@ -126,7 +126,7 @@ var _ = Describe("Metadata", func() {
 			})
 
 			It("should update a new measure", func() {
-				svcs.repo.EXPECT().Publish(event.MeasureTopicEntityEvent, test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).Times(1)
+				svcs.repo.EXPECT().Publish(event.MeasureTopicEntityEvent, test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).AnyTimes()
 				// Remove the first tag from the entity
 				measureSchema.Entity.TagNames = measureSchema.Entity.TagNames[1:]
 				entitySize := len(measureSchema.Entity.TagNames)
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index 59abc23..e72520a 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -236,7 +236,7 @@ func (t *topNQueryProcessor) scanSeries(series tsdb.Series, request *measurev1.T
 var _ heap.Interface = (*postAggregationProcessor)(nil)
 
 type aggregatorItem struct {
-	int64Func aggregation.Int64Func
+	int64Func aggregation.Func[int64]
 	key       string
 	index     int
 }
@@ -324,7 +324,7 @@ func (aggr *postAggregationProcessor) put(key string, val int64, timestampMillis
 		return nil
 	}
 
-	aggrFunc, err := aggregation.NewInt64Func(aggr.aggrFunc)
+	aggrFunc, err := aggregation.NewFunc[int64](aggr.aggrFunc)
 	if err != nil {
 		return err
 	}
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 2c5839b..e969269 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -25,6 +25,7 @@
   
 - [banyandb/model/v1/common.proto](#banyandb_model_v1_common-proto)
     - [FieldValue](#banyandb-model-v1-FieldValue)
+    - [Float](#banyandb-model-v1-Float)
     - [ID](#banyandb-model-v1-ID)
     - [Int](#banyandb-model-v1-Int)
     - [IntArray](#banyandb-model-v1-IntArray)
@@ -479,6 +480,22 @@ Metadata is for multi-tenant, multi-model use
 | str | [Str](#banyandb-model-v1-Str) |  |  |
 | int | [Int](#banyandb-model-v1-Int) |  |  |
 | binary_data | [bytes](#bytes) |  |  |
+| float | [Float](#banyandb-model-v1-Float) |  |  |
+
+
+
+
+
+
+<a name="banyandb-model-v1-Float"></a>
+
+### Float
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| value | [double](#double) |  |  |
 
 
 
@@ -1073,6 +1090,7 @@ TopNAggregation generates offline TopN statistics for a measure&#39;s TopN appro
 | FIELD_TYPE_STRING | 1 |  |
 | FIELD_TYPE_INT | 2 |  |
 | FIELD_TYPE_DATA_BINARY | 3 |  |
+| FIELD_TYPE_FLOAT | 4 |  |
 
 
 
diff --git a/pkg/convert/number.go b/pkg/convert/number.go
index f94c252..0074c58 100644
--- a/pkg/convert/number.go
+++ b/pkg/convert/number.go
@@ -19,6 +19,7 @@ package convert
 
 import (
 	"encoding/binary"
+	"math"
 )
 
 // Uint64ToBytes converts uint64 to bytes.
@@ -74,3 +75,15 @@ func BytesToUint64(b []byte) uint64 {
 func BytesToUint32(b []byte) uint32 {
 	return binary.BigEndian.Uint32(b)
 }
+
+// Float64ToBytes converts float64 to byes.
+func Float64ToBytes(f float64) []byte {
+	bs := make([]byte, 8)
+	binary.BigEndian.PutUint64(bs, math.Float64bits(f))
+	return bs
+}
+
+// BytesToFloat64 converts bytes to float64.
+func BytesToFloat64(b []byte) float64 {
+	return math.Float64frombits(binary.BigEndian.Uint64(b))
+}
diff --git a/pkg/encoding/int.go b/pkg/encoding/encoder.go
similarity index 68%
rename from pkg/encoding/int.go
rename to pkg/encoding/encoder.go
index ed81b72..f5c4838 100644
--- a/pkg/encoding/int.go
+++ b/pkg/encoding/encoder.go
@@ -20,46 +20,48 @@ package encoding
 import (
 	"bytes"
 	"encoding/binary"
-	"errors"
 	"io"
 	"sync"
 	"time"
 
+	"github.com/pkg/errors"
+
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 )
 
 var (
-	intEncoderPool = sync.Pool{
-		New: newIntEncoder,
+	encoderPool = sync.Pool{
+		New: newEncoder,
 	}
-	intDecoderPool = sync.Pool{
+	decoderPool = sync.Pool{
 		New: func() interface{} {
-			return &intDecoder{}
+			return &decoder{}
 		},
 	}
 
 	errInvalidValue = errors.New("invalid encoded value")
+	errNoData       = errors.New("there is no data")
 )
 
-type intEncoderPoolDelegator struct {
+type encoderPoolDelegator struct {
 	pool *sync.Pool
 	fn   ParseInterval
 	name string
 	size int
 }
 
-// NewIntEncoderPool returns a SeriesEncoderPool which provides int-based xor encoders.
-func NewIntEncoderPool(name string, size int, fn ParseInterval) SeriesEncoderPool {
-	return &intEncoderPoolDelegator{
+// NewEncoderPool returns a SeriesEncoderPool which provides int-based xor encoders.
+func NewEncoderPool(name string, size int, fn ParseInterval) SeriesEncoderPool {
+	return &encoderPoolDelegator{
 		name: name,
-		pool: &intEncoderPool,
+		pool: &encoderPool,
 		size: size,
 		fn:   fn,
 	}
 }
 
-func (b *intEncoderPoolDelegator) Get(metadata []byte, buffer BufferWriter) SeriesEncoder {
-	encoder := b.pool.Get().(*intEncoder)
+func (b *encoderPoolDelegator) Get(metadata []byte, buffer BufferWriter) SeriesEncoder {
+	encoder := b.pool.Get().(*encoder)
 	encoder.name = b.name
 	encoder.size = b.size
 	encoder.fn = b.fn
@@ -67,51 +69,51 @@ func (b *intEncoderPoolDelegator) Get(metadata []byte, buffer BufferWriter) Seri
 	return encoder
 }
 
-func (b *intEncoderPoolDelegator) Put(encoder SeriesEncoder) {
-	_, ok := encoder.(*intEncoder)
+func (b *encoderPoolDelegator) Put(seriesEncoder SeriesEncoder) {
+	_, ok := seriesEncoder.(*encoder)
 	if ok {
-		b.pool.Put(encoder)
+		b.pool.Put(seriesEncoder)
 	}
 }
 
-type intDecoderPoolDelegator struct {
+type decoderPoolDelegator struct {
 	pool *sync.Pool
 	fn   ParseInterval
 	name string
 	size int
 }
 
-// NewIntDecoderPool returns a SeriesDecoderPool which provides int-based xor decoders.
-func NewIntDecoderPool(name string, size int, fn ParseInterval) SeriesDecoderPool {
-	return &intDecoderPoolDelegator{
+// NewDecoderPool returns a SeriesDecoderPool which provides int-based xor decoders.
+func NewDecoderPool(name string, size int, fn ParseInterval) SeriesDecoderPool {
+	return &decoderPoolDelegator{
 		name: name,
-		pool: &intDecoderPool,
+		pool: &decoderPool,
 		size: size,
 		fn:   fn,
 	}
 }
 
-func (b *intDecoderPoolDelegator) Get(_ []byte) SeriesDecoder {
-	decoder := b.pool.Get().(*intDecoder)
+func (b *decoderPoolDelegator) Get(_ []byte) SeriesDecoder {
+	decoder := b.pool.Get().(*decoder)
 	decoder.name = b.name
 	decoder.size = b.size
 	decoder.fn = b.fn
 	return decoder
 }
 
-func (b *intDecoderPoolDelegator) Put(decoder SeriesDecoder) {
-	_, ok := decoder.(*intDecoder)
+func (b *decoderPoolDelegator) Put(seriesDecoder SeriesDecoder) {
+	_, ok := seriesDecoder.(*decoder)
 	if ok {
-		b.pool.Put(decoder)
+		b.pool.Put(seriesDecoder)
 	}
 }
 
-var _ SeriesEncoder = (*intEncoder)(nil)
+var _ SeriesEncoder = (*encoder)(nil)
 
 // ParseInterval parses the interval rule from the key in a kv pair.
 type ParseInterval = func(key []byte) time.Duration
 
-type intEncoder struct {
+type encoder struct {
 	buff      BufferWriter
 	bw        *Writer
 	values    *XOREncoder
@@ -124,15 +126,15 @@ type intEncoder struct {
 	size      int
 }
 
-func newIntEncoder() interface{} {
+func newEncoder() interface{} {
 	bw := NewWriter()
-	return &intEncoder{
+	return &encoder{
 		bw:     bw,
 		values: NewXOREncoder(bw),
 	}
 }
 
-func (ie *intEncoder) Append(ts uint64, value []byte) {
+func (ie *encoder) Append(ts uint64, value []byte) {
 	if len(value) > 8 {
 		return
 	}
@@ -160,11 +162,11 @@ func (ie *intEncoder) Append(ts uint64, value []byte) {
 	rawSize.WithLabelValues(ie.name, "int").Add(float64(l + 8))
 }
 
-func (ie *intEncoder) IsFull() bool {
+func (ie *encoder) IsFull() bool {
 	return ie.num >= ie.size
 }
 
-func (ie *intEncoder) Reset(key []byte, buffer BufferWriter) {
+func (ie *encoder) Reset(key []byte, buffer BufferWriter) {
 	ie.buff = buffer
 	ie.bw.Reset(buffer)
 	ie.interval = ie.fn(key)
@@ -174,7 +176,7 @@ func (ie *intEncoder) Reset(key []byte, buffer BufferWriter) {
 	ie.values = NewXOREncoder(ie.bw)
 }
 
-func (ie *intEncoder) Encode() error {
+func (ie *encoder) Encode() error {
 	ie.bw.Flush()
 	buffWriter := NewPacker(ie.buff)
 	buffWriter.PutUint64(ie.startTime)
@@ -184,13 +186,13 @@ func (ie *intEncoder) Encode() error {
 	return nil
 }
 
-func (ie *intEncoder) StartTime() uint64 {
+func (ie *encoder) StartTime() uint64 {
 	return ie.startTime
 }
 
-var _ SeriesDecoder = (*intDecoder)(nil)
+var _ SeriesDecoder = (*decoder)(nil)
 
-type intDecoder struct {
+type decoder struct {
 	fn        ParseInterval
 	name      string
 	area      []byte
@@ -200,7 +202,7 @@ type intDecoder struct {
 	num       int
 }
 
-func (i *intDecoder) Decode(key, data []byte) error {
+func (i *decoder) Decode(key, data []byte) error {
 	if len(data) < 10 {
 		return errInvalidValue
 	}
@@ -211,28 +213,28 @@ func (i *intDecoder) Decode(key, data []byte) error {
 	return nil
 }
 
-func (i intDecoder) Len() int {
+func (i decoder) Len() int {
 	return i.num
 }
 
-func (i intDecoder) IsFull() bool {
+func (i decoder) IsFull() bool {
 	return i.num >= i.size
 }
 
-func (i intDecoder) Get(ts uint64) ([]byte, error) {
+func (i decoder) Get(ts uint64) ([]byte, error) {
 	for iter := i.Iterator(); iter.Next(); {
 		if iter.Time() == ts {
 			return iter.Val(), nil
 		}
 	}
-	return zeroBytes, nil
+	return nil, errors.WithMessagef(errNoData, "ts:%d", ts)
 }
 
-func (i intDecoder) Range() (start, end uint64) {
+func (i decoder) Range() (start, end uint64) {
 	return i.startTime, i.startTime + uint64(i.num-1)*uint64(i.interval)
 }
 
-func (i intDecoder) Iterator() SeriesIterator {
+func (i decoder) Iterator() SeriesIterator {
 	br := NewReader(bytes.NewReader(i.area))
 	return &intIterator{
 		endTime:  i.startTime + uint64(i.num*int(i.interval)),
@@ -243,11 +245,7 @@ func (i intDecoder) Iterator() SeriesIterator {
 	}
 }
 
-var (
-	_         SeriesIterator = (*intIterator)(nil)
-	zeroBytes                = convert.Uint64ToBytes(zero)
-	zero                     = convert.BytesToUint64(convert.Int64ToBytes(0))
-)
+var _ SeriesIterator = (*intIterator)(nil)
 
 type intIterator struct {
 	err      error
diff --git a/pkg/encoding/encoder_test.go b/pkg/encoding/encoder_test.go
new file mode 100644
index 0000000..840a883
--- /dev/null
+++ b/pkg/encoding/encoder_test.go
@@ -0,0 +1,392 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+	"bytes"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+func TestNewEncoderAndDecoder(t *testing.T) {
+	type tsData struct {
+		ts    []uint64
+		data  []any
+		start uint64
+		end   uint64
+	}
+	tests := []struct {
+		name string
+		args tsData
+		want tsData
+	}{
+		{
+			name: "int golden path",
+			args: tsData{
+				ts:   []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+				data: []any{7, 8, 7, 9},
+			},
+			want: tsData{
+				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+				data:  []any{7, 8, 7, 9},
+				start: uint64(time.Minute),
+				end:   uint64(4 * time.Minute),
+			},
+		},
+		{
+			name: "int more than the size",
+			args: tsData{
+				ts:   []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)},
+				data: []any{7, 8, 7, 9, 6},
+			},
+			want: tsData{
+				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+				data:  []any{7, 8, 7, 9},
+				start: uint64(time.Minute),
+				end:   uint64(4 * time.Minute),
+			},
+		},
+		{
+			name: "int less than the size",
+			args: tsData{
+				ts:   []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+				data: []any{7, 8, 7},
+			},
+			want: tsData{
+				ts:    []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+				data:  []any{7, 8, 7},
+				start: uint64(time.Minute),
+				end:   uint64(3 * time.Minute),
+			},
+		},
+		{
+			name: "int empty slot in the middle",
+			args: tsData{
+				ts:   []uint64{uint64(4 * time.Minute), uint64(time.Minute)},
+				data: []any{7, 9},
+			},
+			want: tsData{
+				ts:    []uint64{uint64(4 * time.Minute), uint64(1 * time.Minute)},
+				data:  []any{7, 9},
+				start: uint64(time.Minute),
+				end:   uint64(4 * time.Minute),
+			},
+		},
+		{
+			name: "float64 golden path",
+			args: tsData{
+				ts:   []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+				data: []any{7.0, 8.0, 7.0, 9.0},
+			},
+			want: tsData{
+				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+				data:  []any{7.0, 8.0, 7.0, 9.0},
+				start: uint64(time.Minute),
+				end:   uint64(4 * time.Minute),
+			},
+		},
+		{
+			name: "float64 more than the size",
+			args: tsData{
+				ts:   []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)},
+				data: []any{0.7, 0.8, 0.7, 0.9, 0.6},
+			},
+			want: tsData{
+				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+				data:  []any{0.7, 0.8, 0.7, 0.9},
+				start: uint64(time.Minute),
+				end:   uint64(4 * time.Minute),
+			},
+		},
+		{
+			name: "float64 less than the size",
+			args: tsData{
+				ts:   []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+				data: []any{1.7, 1.8, 1.7},
+			},
+			want: tsData{
+				ts:    []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+				data:  []any{1.7, 1.8, 1.7},
+				start: uint64(time.Minute),
+				end:   uint64(3 * time.Minute),
+			},
+		},
+		{
+			name: "float64 empty slot in the middle",
+			args: tsData{
+				ts:   []uint64{uint64(4 * time.Minute), uint64(time.Minute)},
+				data: []any{0.700033, 0.988822},
+			},
+			want: tsData{
+				ts:    []uint64{uint64(4 * time.Minute), uint64(1 * time.Minute)},
+				data:  []any{0.700033, 0.988822},
+				start: uint64(time.Minute),
+				end:   uint64(4 * time.Minute),
+			},
+		},
+	}
+	key := []byte("foo")
+	fn := func(k []byte) time.Duration {
+		assert.Equal(t, key, k)
+		return 1 * time.Minute
+	}
+	encoderPool := NewEncoderPool("minute", 4, fn)
+	decoderPool := NewDecoderPool("minute", 4, fn)
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			at := assert.New(t)
+			var buffer bytes.Buffer
+			encoder := encoderPool.Get(key, &buffer)
+			defer encoderPool.Put(encoder)
+			decoder := decoderPool.Get(key)
+			defer decoderPool.Put(decoder)
+			isFull := false
+			for i, v := range tt.args.ts {
+				encoder.Append(v, ToBytes(tt.args.data[i]))
+				if encoder.IsFull() {
+					isFull = true
+					break
+				}
+			}
+			err := encoder.Encode()
+			at.NoError(err)
+
+			at.Equal(tt.want.start, encoder.StartTime())
+			at.NoError(decoder.Decode(key, buffer.Bytes()))
+			start, end := decoder.Range()
+			at.Equal(tt.want.start, start)
+			at.Equal(tt.want.end, end)
+			if isFull {
+				at.True(decoder.IsFull())
+			}
+			i := 0
+			for iter := decoder.Iterator(); iter.Next(); i++ {
+				at.NoError(iter.Error())
+				at.Equal(tt.want.ts[i], iter.Time())
+				at.Equal(tt.want.data[i], BytesTo(tt.want.data[i], iter.Val()))
+				v, err := decoder.Get(iter.Time())
+				at.NoError(err)
+				at.Equal(tt.want.data[i], BytesTo(tt.want.data[i], v))
+			}
+			at.Equal(len(tt.want.ts), i)
+		})
+	}
+}
+
+func ToBytes(v any) []byte {
+	switch d := v.(type) {
+	case int:
+		return convert.Int64ToBytes(int64(d))
+	case float64:
+		return convert.Float64ToBytes(d)
+	}
+	return nil
+}
+
+func BytesTo(t any, b []byte) any {
+	switch t.(type) {
+	case int:
+		return int(convert.BytesToInt64(b))
+	case float64:
+		return convert.BytesToFloat64(b)
+	}
+	return nil
+}
+
+func TestNewDecoderGet(t *testing.T) {
+	type tsData struct {
+		ts   []uint64
+		data []any
+	}
+	type wantData struct {
+		ts      []uint64
+		data    []any
+		wantErr []bool
+		start   uint64
+		end     uint64
+	}
+	tests := []struct {
+		name string
+		args tsData
+		want wantData
+	}{
+		{
+			name: "int golden path",
+			args: tsData{
+				ts:   []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+				data: []any{7, 8, 7, 9},
+			},
+			want: wantData{
+				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+				data:  []any{7, 8, 7, 9},
+				start: uint64(time.Minute),
+				end:   uint64(4 * time.Minute),
+			},
+		},
+		{
+			name: "int more than the size",
+			args: tsData{
+				ts:   []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)},
+				data: []any{7, 8, 7, 9, 6},
+			},
+			want: wantData{
+				ts:      []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 0},
+				data:    []any{7, 8, 7, 9, nil},
+				wantErr: []bool{false, false, false, false, true},
+				start:   uint64(time.Minute),
+				end:     uint64(4 * time.Minute),
+			},
+		},
+		{
+			name: "int less than the size",
+			args: tsData{
+				ts:   []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+				data: []any{7, 8, 7},
+			},
+			want: wantData{
+				ts:    []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+				data:  []any{7, 8, 7},
+				start: uint64(time.Minute),
+				end:   uint64(3 * time.Minute),
+			},
+		},
+		{
+			name: "int empty slot in the middle",
+			args: tsData{
+				ts:   []uint64{uint64(4 * time.Minute), uint64(time.Minute)},
+				data: []any{7, 9},
+			},
+			want: wantData{
+				ts:      []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+				data:    []any{7, nil, nil, 9},
+				wantErr: []bool{false, true, true, false},
+				start:   uint64(time.Minute),
+				end:     uint64(4 * time.Minute),
+			},
+		},
+		{
+			name: "float golden path",
+			args: tsData{
+				ts:   []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+				data: []any{7.0, 8.0, 7.0, 9.0},
+			},
+			want: wantData{
+				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+				data:  []any{7.0, 8.0, 7.0, 9.0},
+				start: uint64(time.Minute),
+				end:   uint64(4 * time.Minute),
+			},
+		},
+		{
+			name: "float more than the size",
+			args: tsData{
+				ts:   []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)},
+				data: []any{1.7, 1.8, 1.7, 1.9, 1.6},
+			},
+			want: wantData{
+				ts:      []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 0},
+				data:    []any{1.7, 1.8, 1.7, 1.9, nil},
+				wantErr: []bool{false, false, false, false, true},
+				start:   uint64(time.Minute),
+				end:     uint64(4 * time.Minute),
+			},
+		},
+		{
+			name: "float less than the size",
+			args: tsData{
+				ts:   []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+				data: []any{0.71, 0.833, 0.709},
+			},
+			want: wantData{
+				ts:    []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+				data:  []any{0.71, 0.833, 0.709},
+				start: uint64(time.Minute),
+				end:   uint64(3 * time.Minute),
+			},
+		},
+		{
+			name: "float empty slot in the middle",
+			args: tsData{
+				ts:   []uint64{uint64(4 * time.Minute), uint64(time.Minute)},
+				data: []any{1.7, 1.9},
+			},
+			want: wantData{
+				ts:      []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+				data:    []any{1.7, nil, nil, 1.9},
+				wantErr: []bool{false, true, true, false},
+				start:   uint64(time.Minute),
+				end:     uint64(4 * time.Minute),
+			},
+		},
+	}
+	key := []byte("foo")
+	fn := func(k []byte) time.Duration {
+		assert.Equal(t, key, k)
+		return 1 * time.Minute
+	}
+	encoderPool := NewEncoderPool("minute", 4, fn)
+	decoderPool := NewDecoderPool("minute", 4, fn)
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			at := assert.New(t)
+			var buffer bytes.Buffer
+			encoder := encoderPool.Get(key, &buffer)
+			defer encoderPool.Put(encoder)
+			decoder := decoderPool.Get(key)
+			defer decoderPool.Put(decoder)
+			isFull := false
+			for i, v := range tt.args.ts {
+				encoder.Append(v, ToBytes(tt.args.data[i]))
+				if encoder.IsFull() {
+					isFull = true
+					break
+				}
+			}
+			err := encoder.Encode()
+			at.NoError(err)
+
+			at.Equal(tt.want.start, encoder.StartTime())
+			at.NoError(decoder.Decode(key, buffer.Bytes()))
+			start, end := decoder.Range()
+			at.Equal(tt.want.start, start)
+			at.Equal(tt.want.end, end)
+			if isFull {
+				at.True(decoder.IsFull())
+			}
+			for i, t := range tt.want.ts {
+				wantErr := false
+				if tt.want.wantErr != nil {
+					wantErr = tt.want.wantErr[i]
+				}
+				v, err := decoder.Get(t)
+				if wantErr {
+					at.ErrorIs(err, errNoData)
+				} else {
+					at.NoError(err)
+					at.Equal(tt.want.data[i], BytesTo(tt.want.data[i], v))
+				}
+			}
+		})
+	}
+}
diff --git a/pkg/encoding/int_test.go b/pkg/encoding/int_test.go
deleted file mode 100644
index 82767ea..0000000
--- a/pkg/encoding/int_test.go
+++ /dev/null
@@ -1,251 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package encoding
-
-import (
-	"bytes"
-	"testing"
-	"time"
-
-	"github.com/stretchr/testify/assert"
-
-	"github.com/apache/skywalking-banyandb/pkg/convert"
-)
-
-func TestNewIntEncoderAndDecoder(t *testing.T) {
-	type tsData struct {
-		ts    []uint64
-		data  []int64
-		start uint64
-		end   uint64
-	}
-	tests := []struct {
-		name string
-		args tsData
-		want tsData
-	}{
-		{
-			name: "golden path",
-			args: tsData{
-				ts:   []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-				data: []int64{7, 8, 7, 9},
-			},
-			want: tsData{
-				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-				data:  []int64{7, 8, 7, 9},
-				start: uint64(time.Minute),
-				end:   uint64(4 * time.Minute),
-			},
-		},
-		{
-			name: "more than the size",
-			args: tsData{
-				ts:   []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)},
-				data: []int64{7, 8, 7, 9, 6},
-			},
-			want: tsData{
-				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-				data:  []int64{7, 8, 7, 9},
-				start: uint64(time.Minute),
-				end:   uint64(4 * time.Minute),
-			},
-		},
-		{
-			name: "less than the size",
-			args: tsData{
-				ts:   []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
-				data: []int64{7, 8, 7},
-			},
-			want: tsData{
-				ts:    []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
-				data:  []int64{7, 8, 7},
-				start: uint64(time.Minute),
-				end:   uint64(3 * time.Minute),
-			},
-		},
-		{
-			name: "empty slot in the middle",
-			args: tsData{
-				ts:   []uint64{uint64(4 * time.Minute), uint64(time.Minute)},
-				data: []int64{7, 9},
-			},
-			want: tsData{
-				ts:    []uint64{uint64(4 * time.Minute), uint64(1 * time.Minute)},
-				data:  []int64{7, 9},
-				start: uint64(time.Minute),
-				end:   uint64(4 * time.Minute),
-			},
-		},
-	}
-	key := []byte("foo")
-	fn := func(k []byte) time.Duration {
-		assert.Equal(t, key, k)
-		return 1 * time.Minute
-	}
-	encoderPool := NewIntEncoderPool("minute", 4, fn)
-	decoderPool := NewIntDecoderPool("minute", 4, fn)
-
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			at := assert.New(t)
-			var buffer bytes.Buffer
-			encoder := encoderPool.Get(key, &buffer)
-			defer encoderPool.Put(encoder)
-			decoder := decoderPool.Get(key)
-			defer decoderPool.Put(decoder)
-			isFull := false
-			for i, v := range tt.args.ts {
-				encoder.Append(v, convert.Int64ToBytes(tt.args.data[i]))
-				if encoder.IsFull() {
-					isFull = true
-					break
-				}
-			}
-			err := encoder.Encode()
-			at.NoError(err)
-
-			at.Equal(tt.want.start, encoder.StartTime())
-			at.NoError(decoder.Decode(key, buffer.Bytes()))
-			start, end := decoder.Range()
-			at.Equal(tt.want.start, start)
-			at.Equal(tt.want.end, end)
-			if isFull {
-				at.True(decoder.IsFull())
-			}
-			i := 0
-			for iter := decoder.Iterator(); iter.Next(); i++ {
-				at.NoError(iter.Error())
-				at.Equal(tt.want.ts[i], iter.Time())
-				at.Equal(tt.want.data[i], convert.BytesToInt64(iter.Val()))
-				v, err := decoder.Get(iter.Time())
-				at.NoError(err)
-				at.Equal(tt.want.data[i], convert.BytesToInt64(v))
-			}
-			at.Equal(len(tt.want.ts), i)
-		})
-	}
-}
-
-func TestNewIntDecoderGet(t *testing.T) {
-	type tsData struct {
-		ts    []uint64
-		data  []int64
-		start uint64
-		end   uint64
-	}
-	tests := []struct {
-		name string
-		args tsData
-		want tsData
-	}{
-		{
-			name: "golden path",
-			args: tsData{
-				ts:   []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-				data: []int64{7, 8, 7, 9},
-			},
-			want: tsData{
-				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-				data:  []int64{7, 8, 7, 9},
-				start: uint64(time.Minute),
-				end:   uint64(4 * time.Minute),
-			},
-		},
-		{
-			name: "more than the size",
-			args: tsData{
-				ts:   []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)},
-				data: []int64{7, 8, 7, 9, 6},
-			},
-			want: tsData{
-				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 0},
-				data:  []int64{7, 8, 7, 9, 0},
-				start: uint64(time.Minute),
-				end:   uint64(4 * time.Minute),
-			},
-		},
-		{
-			name: "less than the size",
-			args: tsData{
-				ts:   []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
-				data: []int64{7, 8, 7},
-			},
-			want: tsData{
-				ts:    []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
-				data:  []int64{7, 8, 7},
-				start: uint64(time.Minute),
-				end:   uint64(3 * time.Minute),
-			},
-		},
-		{
-			name: "empty slot in the middle",
-			args: tsData{
-				ts:   []uint64{uint64(4 * time.Minute), uint64(time.Minute)},
-				data: []int64{7, 9},
-			},
-			want: tsData{
-				ts:    []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-				data:  []int64{7, 0, 0, 9},
-				start: uint64(time.Minute),
-				end:   uint64(4 * time.Minute),
-			},
-		},
-	}
-	key := []byte("foo")
-	fn := func(k []byte) time.Duration {
-		assert.Equal(t, key, k)
-		return 1 * time.Minute
-	}
-	encoderPool := NewIntEncoderPool("minute", 4, fn)
-	decoderPool := NewIntDecoderPool("minute", 4, fn)
-
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			at := assert.New(t)
-			var buffer bytes.Buffer
-			encoder := encoderPool.Get(key, &buffer)
-			defer encoderPool.Put(encoder)
-			decoder := decoderPool.Get(key)
-			defer decoderPool.Put(decoder)
-			isFull := false
-			for i, v := range tt.args.ts {
-				encoder.Append(v, convert.Int64ToBytes(tt.args.data[i]))
-				if encoder.IsFull() {
-					isFull = true
-					break
-				}
-			}
-			err := encoder.Encode()
-			at.NoError(err)
-
-			at.Equal(tt.want.start, encoder.StartTime())
-			at.NoError(decoder.Decode(key, buffer.Bytes()))
-			start, end := decoder.Range()
-			at.Equal(tt.want.start, start)
-			at.Equal(tt.want.end, end)
-			if isFull {
-				at.True(decoder.IsFull())
-			}
-			for i, t := range tt.want.ts {
-				v, err := decoder.Get(t)
-				at.NoError(err)
-				at.Equal(tt.want.data[i], convert.BytesToInt64(v))
-			}
-		})
-	}
-}
diff --git a/pkg/pb/v1/metadata.go b/pkg/pb/v1/metadata.go
index 7de4753..373c0e4 100644
--- a/pkg/pb/v1/metadata.go
+++ b/pkg/pb/v1/metadata.go
@@ -57,10 +57,12 @@ func tagValueTypeConv(tagValue *modelv1.TagValue) (tagType databasev1.TagType, i
 }
 
 // FieldValueTypeConv recognizes the field type from its value.
-func FieldValueTypeConv(tagValue *modelv1.FieldValue) (tagType databasev1.FieldType, isNull bool) {
-	switch tagValue.GetValue().(type) {
+func FieldValueTypeConv(fieldValue *modelv1.FieldValue) (tagType databasev1.FieldType, isNull bool) {
+	switch fieldValue.GetValue().(type) {
 	case *modelv1.FieldValue_Int:
 		return databasev1.FieldType_FIELD_TYPE_INT, false
+	case *modelv1.FieldValue_Float:
+		return databasev1.FieldType_FIELD_TYPE_FLOAT, false
 	case *modelv1.FieldValue_Str:
 		return databasev1.FieldType_FIELD_TYPE_STRING, false
 	case *modelv1.FieldValue_BinaryData:
diff --git a/pkg/pb/v1/write.go b/pkg/pb/v1/write.go
index 03c470b..600df45 100644
--- a/pkg/pb/v1/write.go
+++ b/pkg/pb/v1/write.go
@@ -190,6 +190,15 @@ func DecodeFieldValue(fieldValue []byte, fieldSpec *databasev1.FieldSpec) (*mode
 				hex.EncodeToString(fieldValue), len(fieldValue))
 		}
 		return &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: convert.BytesToInt64(fieldValue)}}}, nil
+	case databasev1.FieldType_FIELD_TYPE_FLOAT:
+		if len(fieldValue) == 0 {
+			return zeroFieldValue, nil
+		}
+		if len(fieldValue) != 8 {
+			return nil, errors.WithMessagef(errMalformedField, "the length of encoded field value(float64) %s is %d, less than 8",
+				hex.EncodeToString(fieldValue), len(fieldValue))
+		}
+		return &modelv1.FieldValue{Value: &modelv1.FieldValue_Float{Float: &modelv1.Float{Value: convert.BytesToFloat64(fieldValue)}}}, nil
 	case databasev1.FieldType_FIELD_TYPE_DATA_BINARY:
 		return &modelv1.FieldValue{Value: &modelv1.FieldValue_BinaryData{BinaryData: fieldValue}}, nil
 	}
diff --git a/pkg/query/aggregation/aggregation.go b/pkg/query/aggregation/aggregation.go
index a0836f9..a581e67 100644
--- a/pkg/query/aggregation/aggregation.go
+++ b/pkg/query/aggregation/aggregation.go
@@ -26,32 +26,91 @@ import (
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 )
 
-var errUnknownFunc = errors.New("unknown aggregation function")
+var (
+	errUnknownFunc          = errors.New("unknown aggregation function")
+	errUnSupportedFieldType = errors.New("unsupported field type")
+)
 
-// Int64Func allows to aggregate int64.
-type Int64Func interface {
-	In(int64)
-	Val() int64
+// Func supports aggregation operations.
+type Func[N Number] interface {
+	In(N)
+	Val() N
 	Reset()
 }
 
-// NewInt64Func returns a Int64Func based on function type.
-func NewInt64Func(af modelv1.AggregationFunction) (Int64Func, error) {
+// Number denotes the supported number types.
+type Number interface {
+	~int64 | ~float64
+}
+
+// NewFunc returns a aggregation function based on function type.
+func NewFunc[N Number](af modelv1.AggregationFunction) (Func[N], error) {
+	var result Func[N]
 	switch af {
 	case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN:
-		return &meanInt64Func{}, nil
+		result = &meanFunc[N]{zero: zero[N]()}
 	case modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT:
-		return &countInt64Func{}, nil
+		result = &countFunc[N]{zero: zero[N]()}
 	case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX:
-		return &maxInt64Func{
-			val: math.MinInt64,
-		}, nil
+		result = &maxFunc[N]{min: minOf[N]()}
 	case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN:
-		return &minInt64Func{
-			val: math.MaxInt64,
-		}, nil
+		result = &minFunc[N]{max: maxOf[N]()}
 	case modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM:
-		return &sumInt64Func{}, nil
+		result = &sumFunc[N]{zero: zero[N]()}
+	default:
+		return nil, errors.WithMessagef(errUnknownFunc, "unknown function:%s", modelv1.AggregationFunction_name[int32(af)])
+	}
+	result.Reset()
+	return result, nil
+}
+
+// FromFieldValue transforms modelv1.FieldValue to Number.
+func FromFieldValue[N Number](fieldValue *modelv1.FieldValue) (N, error) {
+	switch fieldValue.GetValue().(type) {
+	case *modelv1.FieldValue_Int:
+		return N(fieldValue.GetInt().Value), nil
+	case *modelv1.FieldValue_Float:
+		return N(fieldValue.GetFloat().Value), nil
+	}
+	return zero[N](), errUnSupportedFieldType
+}
+
+// ToFieldValue transforms Number to modelv1.FieldValue.
+func ToFieldValue[N Number](value N) (*modelv1.FieldValue, error) {
+	switch any(value).(type) {
+	case int64:
+		return &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: int64(value)}}}, nil
+	case float64:
+		return &modelv1.FieldValue{Value: &modelv1.FieldValue_Float{Float: &modelv1.Float{Value: float64(value)}}}, nil
 	}
-	return nil, errUnknownFunc
+	return nil, errUnSupportedFieldType
+}
+
+func minOf[N Number]() (r N) {
+	switch x := any(&r).(type) {
+	case *int64:
+		*x = math.MinInt64
+	case *float64:
+		*x = -math.MaxFloat64
+	default:
+		panic("unreachable")
+	}
+	return
+}
+
+func maxOf[N Number]() (r N) {
+	switch x := any(&r).(type) {
+	case *int64:
+		*x = math.MaxInt64
+	case *float64:
+		*x = math.MaxFloat64
+	default:
+		panic("unreachable")
+	}
+	return
+}
+
+func zero[N Number]() N {
+	var z N
+	return z
 }
diff --git a/pkg/query/aggregation/function.go b/pkg/query/aggregation/function.go
index 59bdbc8..faa019d 100644
--- a/pkg/query/aggregation/function.go
+++ b/pkg/query/aggregation/function.go
@@ -17,23 +17,20 @@
 
 package aggregation
 
-import "math"
-
-var _ Int64Func = (*meanInt64Func)(nil)
-
-type meanInt64Func struct {
-	sum   int64
-	count int64
+type meanFunc[N Number] struct {
+	sum   N
+	count N
+	zero  N
 }
 
-func (m *meanInt64Func) In(val int64) {
+func (m *meanFunc[N]) In(val N) {
 	m.sum += val
 	m.count++
 }
 
-func (m meanInt64Func) Val() int64 {
-	if m.count == 0 {
-		return 0
+func (m meanFunc[N]) Val() N {
+	if m.count == m.zero {
+		return m.zero
 	}
 	v := m.sum / m.count
 	if v < 1 {
@@ -42,83 +39,79 @@ func (m meanInt64Func) Val() int64 {
 	return v
 }
 
-func (m *meanInt64Func) Reset() {
-	m.sum = 0
-	m.count = 0
+func (m *meanFunc[N]) Reset() {
+	m.sum = m.zero
+	m.count = m.zero
 }
 
-var _ Int64Func = (*countInt64Func)(nil)
-
-type countInt64Func struct {
-	count int64
+type countFunc[N Number] struct {
+	count N
+	zero  N
 }
 
-func (c *countInt64Func) In(_ int64) {
+func (c *countFunc[N]) In(_ N) {
 	c.count++
 }
 
-func (c countInt64Func) Val() int64 {
+func (c countFunc[N]) Val() N {
 	return c.count
 }
 
-func (c *countInt64Func) Reset() {
-	c.count = 0
+func (c *countFunc[N]) Reset() {
+	c.count = c.zero
 }
 
-var _ Int64Func = (*sumInt64Func)(nil)
-
-type sumInt64Func struct {
-	sum int64
+type sumFunc[N Number] struct {
+	sum  N
+	zero N
 }
 
-func (s *sumInt64Func) In(val int64) {
+func (s *sumFunc[N]) In(val N) {
 	s.sum += val
 }
 
-func (s sumInt64Func) Val() int64 {
+func (s sumFunc[N]) Val() N {
 	return s.sum
 }
 
-func (s *sumInt64Func) Reset() {
-	s.sum = 0
+func (s *sumFunc[N]) Reset() {
+	s.sum = s.zero
 }
 
-var _ Int64Func = (*maxInt64Func)(nil)
-
-type maxInt64Func struct {
-	val int64
+type maxFunc[N Number] struct {
+	val N
+	min N
 }
 
-func (m *maxInt64Func) In(val int64) {
+func (m *maxFunc[N]) In(val N) {
 	if val > m.val {
 		m.val = val
 	}
 }
 
-func (m maxInt64Func) Val() int64 {
+func (m maxFunc[N]) Val() N {
 	return m.val
 }
 
-func (m *maxInt64Func) Reset() {
-	m.val = math.MinInt64
+func (m *maxFunc[N]) Reset() {
+	m.val = m.min
 }
 
-var _ Int64Func = (*minInt64Func)(nil)
-
-type minInt64Func struct {
-	val int64
+type minFunc[N Number] struct {
+	val N
+	max N
 }
 
-func (m *minInt64Func) In(val int64) {
+func (m *minFunc[N]) In(val N) {
 	if val < m.val {
 		m.val = val
 	}
 }
 
-func (m minInt64Func) Val() int64 {
+func (m minFunc[N]) Val() N {
 	return m.val
 }
 
-func (m *minInt64Func) Reset() {
-	m.val = math.MaxInt64
+func (m *minFunc[N]) Reset() {
+	m.val = m.max
 }
diff --git a/pkg/query/logical/measure/measure_plan_aggregation.go b/pkg/query/logical/measure/measure_plan_aggregation.go
index 9c35adc..f9fa5cf 100644
--- a/pkg/query/logical/measure/measure_plan_aggregation.go
+++ b/pkg/query/logical/measure/measure_plan_aggregation.go
@@ -21,7 +21,9 @@ import (
 	"fmt"
 
 	"github.com/pkg/errors"
+	"go.uber.org/multierr"
 
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
 	measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 	"github.com/apache/skywalking-banyandb/pkg/query/aggregation"
@@ -31,7 +33,8 @@ import (
 
 var (
 	_ logical.UnresolvedPlan = (*unresolvedAggregation)(nil)
-	_ logical.Plan           = (*aggregationPlan)(nil)
+
+	errUnsupportedAggregationField = errors.New("unsupported aggregation operation on this field")
 )
 
 type unresolvedAggregation struct {
@@ -63,47 +66,61 @@ func (gba *unresolvedAggregation) Analyze(measureSchema logical.Schema) (logical
 	if len(aggregationFieldRefs) == 0 {
 		return nil, errors.Wrap(errFieldNotDefined, "aggregation schema")
 	}
-	aggrFunc, err := aggregation.NewInt64Func(gba.aggrFunc)
+	fieldRef := aggregationFieldRefs[0]
+	switch fieldRef.Spec.Spec.FieldType {
+	case databasev1.FieldType_FIELD_TYPE_INT:
+		return newAggregationPlan[int64](gba, prevPlan, measureSchema, fieldRef)
+	case databasev1.FieldType_FIELD_TYPE_FLOAT:
+		return newAggregationPlan[float64](gba, prevPlan, measureSchema, fieldRef)
+	default:
+		return nil, errors.WithMessagef(errUnsupportedAggregationField, "field: %s", fieldRef.Spec.Spec)
+	}
+}
+
+type aggregationPlan[N aggregation.Number] struct {
+	*logical.Parent
+	schema              logical.Schema
+	aggregationFieldRef *logical.FieldRef
+	aggrFunc            aggregation.Func[N]
+	aggrType            modelv1.AggregationFunction
+	isGroup             bool
+}
+
+func newAggregationPlan[N aggregation.Number](gba *unresolvedAggregation, prevPlan logical.Plan,
+	measureSchema logical.Schema, fieldRef *logical.FieldRef,
+) (*aggregationPlan[N], error) {
+	aggrFunc, err := aggregation.NewFunc[N](gba.aggrFunc)
 	if err != nil {
 		return nil, err
 	}
-	return &aggregationPlan{
+	return &aggregationPlan[N]{
 		Parent: &logical.Parent{
 			UnresolvedInput: gba.unresolvedInput,
 			Input:           prevPlan,
 		},
 		schema:              measureSchema,
 		aggrFunc:            aggrFunc,
-		aggregationFieldRef: aggregationFieldRefs[0],
+		aggregationFieldRef: fieldRef,
 		isGroup:             gba.isGroup,
 	}, nil
 }
 
-type aggregationPlan struct {
-	*logical.Parent
-	schema              logical.Schema
-	aggregationFieldRef *logical.FieldRef
-	aggrFunc            aggregation.Int64Func
-	aggrType            modelv1.AggregationFunction
-	isGroup             bool
-}
-
-func (g *aggregationPlan) String() string {
+func (g *aggregationPlan[N]) String() string {
 	return fmt.Sprintf("%s aggregation: aggregation{type=%d,field=%s}",
 		g.Input,
 		g.aggrType,
 		g.aggregationFieldRef.Field.Name)
 }
 
-func (g *aggregationPlan) Children() []logical.Plan {
+func (g *aggregationPlan[N]) Children() []logical.Plan {
 	return []logical.Plan{g.Input}
 }
 
-func (g *aggregationPlan) Schema() logical.Schema {
+func (g *aggregationPlan[N]) Schema() logical.Schema {
 	return g.schema.ProjFields(g.aggregationFieldRef)
 }
 
-func (g *aggregationPlan) Execute(ec executor.MeasureExecutionContext) (executor.MIterator, error) {
+func (g *aggregationPlan[N]) Execute(ec executor.MeasureExecutionContext) (executor.MIterator, error) {
 	iter, err := g.Parent.Input.(executor.MeasureExecutable).Execute(ec)
 	if err != nil {
 		return nil, err
@@ -114,38 +131,49 @@ func (g *aggregationPlan) Execute(ec executor.MeasureExecutionContext) (executor
 	return newAggAllIterator(iter, g.aggregationFieldRef, g.aggrFunc), nil
 }
 
-type aggGroupIterator struct {
+type aggGroupIterator[N aggregation.Number] struct {
 	prev                executor.MIterator
 	aggregationFieldRef *logical.FieldRef
-	aggrFunc            aggregation.Int64Func
+	aggrFunc            aggregation.Func[N]
+
+	err error
 }
 
-func newAggGroupMIterator(
+func newAggGroupMIterator[N aggregation.Number](
 	prev executor.MIterator,
 	aggregationFieldRef *logical.FieldRef,
-	aggrFunc aggregation.Int64Func,
+	aggrFunc aggregation.Func[N],
 ) executor.MIterator {
-	return &aggGroupIterator{
+	return &aggGroupIterator[N]{
 		prev:                prev,
 		aggregationFieldRef: aggregationFieldRef,
 		aggrFunc:            aggrFunc,
 	}
 }
 
-func (ami *aggGroupIterator) Next() bool {
+func (ami *aggGroupIterator[N]) Next() bool {
+	if ami.err != nil {
+		return false
+	}
 	return ami.prev.Next()
 }
 
-func (ami *aggGroupIterator) Current() []*measurev1.DataPoint {
+func (ami *aggGroupIterator[N]) Current() []*measurev1.DataPoint {
+	if ami.err != nil {
+		return nil
+	}
 	ami.aggrFunc.Reset()
 	group := ami.prev.Current()
 	var resultDp *measurev1.DataPoint
 	for _, dp := range group {
 		value := dp.GetFields()[ami.aggregationFieldRef.Spec.FieldIdx].
-			GetValue().
-			GetInt().
 			GetValue()
-		ami.aggrFunc.In(value)
+		v, err := aggregation.FromFieldValue[N](value)
+		if err != nil {
+			ami.err = err
+			return nil
+		}
+		ami.aggrFunc.In(v)
 		if resultDp != nil {
 			continue
 		}
@@ -156,47 +184,47 @@ func (ami *aggGroupIterator) Current() []*measurev1.DataPoint {
 	if resultDp == nil {
 		return nil
 	}
+	val, err := aggregation.ToFieldValue(ami.aggrFunc.Val())
+	if err != nil {
+		ami.err = err
+		return nil
+	}
 	resultDp.Fields = []*measurev1.DataPoint_Field{
 		{
-			Name: ami.aggregationFieldRef.Field.Name,
-			Value: &modelv1.FieldValue{
-				Value: &modelv1.FieldValue_Int{
-					Int: &modelv1.Int{
-						Value: ami.aggrFunc.Val(),
-					},
-				},
-			},
+			Name:  ami.aggregationFieldRef.Field.Name,
+			Value: val,
 		},
 	}
 	return []*measurev1.DataPoint{resultDp}
 }
 
-func (ami *aggGroupIterator) Close() error {
-	return ami.prev.Close()
+func (ami *aggGroupIterator[N]) Close() error {
+	return multierr.Combine(ami.err, ami.prev.Close())
 }
 
-type aggAllIterator struct {
+type aggAllIterator[N aggregation.Number] struct {
 	prev                executor.MIterator
 	aggregationFieldRef *logical.FieldRef
-	aggrFunc            aggregation.Int64Func
+	aggrFunc            aggregation.Func[N]
 
 	result *measurev1.DataPoint
+	err    error
 }
 
-func newAggAllIterator(
+func newAggAllIterator[N aggregation.Number](
 	prev executor.MIterator,
 	aggregationFieldRef *logical.FieldRef,
-	aggrFunc aggregation.Int64Func,
+	aggrFunc aggregation.Func[N],
 ) executor.MIterator {
-	return &aggAllIterator{
+	return &aggAllIterator[N]{
 		prev:                prev,
 		aggregationFieldRef: aggregationFieldRef,
 		aggrFunc:            aggrFunc,
 	}
 }
 
-func (ami *aggAllIterator) Next() bool {
-	if ami.result != nil {
+func (ami *aggAllIterator[N]) Next() bool {
+	if ami.result != nil || ami.err != nil {
 		return false
 	}
 	defer ami.prev.Close()
@@ -205,10 +233,13 @@ func (ami *aggAllIterator) Next() bool {
 		group := ami.prev.Current()
 		for _, dp := range group {
 			value := dp.GetFields()[ami.aggregationFieldRef.Spec.FieldIdx].
-				GetValue().
-				GetInt().
 				GetValue()
-			ami.aggrFunc.In(value)
+			v, err := aggregation.FromFieldValue[N](value)
+			if err != nil {
+				ami.err = err
+				return false
+			}
+			ami.aggrFunc.In(v)
 			if resultDp != nil {
 				continue
 			}
@@ -220,29 +251,28 @@ func (ami *aggAllIterator) Next() bool {
 	if resultDp == nil {
 		return false
 	}
+	val, err := aggregation.ToFieldValue(ami.aggrFunc.Val())
+	if err != nil {
+		ami.err = err
+		return false
+	}
 	resultDp.Fields = []*measurev1.DataPoint_Field{
 		{
-			Name: ami.aggregationFieldRef.Field.Name,
-			Value: &modelv1.FieldValue{
-				Value: &modelv1.FieldValue_Int{
-					Int: &modelv1.Int{
-						Value: ami.aggrFunc.Val(),
-					},
-				},
-			},
+			Name:  ami.aggregationFieldRef.Field.Name,
+			Value: val,
 		},
 	}
 	ami.result = resultDp
 	return true
 }
 
-func (ami *aggAllIterator) Current() []*measurev1.DataPoint {
+func (ami *aggAllIterator[N]) Current() []*measurev1.DataPoint {
 	if ami.result == nil {
 		return nil
 	}
 	return []*measurev1.DataPoint{ami.result}
 }
 
-func (ami *aggAllIterator) Close() error {
+func (ami *aggAllIterator[N]) Close() error {
 	return nil
 }
diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go
index e217103..c2f6a90 100644
--- a/pkg/schema/metadata.go
+++ b/pkg/schema/metadata.go
@@ -25,6 +25,7 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/onsi/ginkgo/v2"
 	"github.com/pkg/errors"
 	"go.uber.org/multierr"
 	"google.golang.org/protobuf/types/known/timestamppb"
@@ -520,6 +521,7 @@ func (g *group) LoadResource(name string) (Resource, bool) {
 }
 
 func (g *group) notify(resource Resource, action databasev1.Action) error {
+	defer ginkgo.GinkgoRecover()
 	now := time.Now()
 	nowPb := timestamppb.New(now)
 	entityLocator := resource.EntityLocator()
diff --git a/pkg/test/measure/testdata/measures/instance_clr_cpu_minute.json b/pkg/test/measure/testdata/measures/instance_clr_cpu_minute.json
new file mode 100644
index 0000000..121770f
--- /dev/null
+++ b/pkg/test/measure/testdata/measures/instance_clr_cpu_minute.json
@@ -0,0 +1,50 @@
+{
+  "entity": {
+    "tagNames": [
+      "entity_id"
+    ]
+  },
+  "fields": [
+    {
+      "compressionMethod": "COMPRESSION_METHOD_ZSTD",
+      "encodingMethod": "ENCODING_METHOD_GORILLA",
+      "fieldType": "FIELD_TYPE_FLOAT",
+      "name": "summation"
+    },
+    {
+      "compressionMethod": "COMPRESSION_METHOD_ZSTD",
+      "encodingMethod": "ENCODING_METHOD_GORILLA",
+      "fieldType": "FIELD_TYPE_INT",
+      "name": "count"
+    },
+    {
+      "compressionMethod": "COMPRESSION_METHOD_ZSTD",
+      "encodingMethod": "ENCODING_METHOD_GORILLA",
+      "fieldType": "FIELD_TYPE_FLOAT",
+      "name": "value"
+    }
+  ],
+  "interval": "1m",
+  "metadata": {
+    "group": "sw_metric",
+    "name": "instance_clr_cpu_minute"
+  },
+  "tagFamilies": [
+    {
+      "name": "default",
+      "tags": [
+        {
+          "indexedOnly": false,
+          "name": "service_id",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "indexedOnly": false,
+          "name": "entity_id",
+          "type": "TAG_TYPE_STRING"
+        }
+      ]
+    }
+  ],
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/test/cases/measure/data/input/float.yaml b/test/cases/measure/data/input/float.yaml
new file mode 100644
index 0000000..73e71f1
--- /dev/null
+++ b/test/cases/measure/data/input/float.yaml
@@ -0,0 +1,26 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+metadata:
+  name: "instance_clr_cpu_minute"
+  group: "sw_metric"
+tagProjection:
+  tagFamilies:
+  - name: "default"
+    tags: ["service_id", "entity_id"]
+fieldProjection:
+  names: ["summation", "count", "value"]
diff --git a/test/cases/measure/data/input/float_agg_min.yaml b/test/cases/measure/data/input/float_agg_min.yaml
new file mode 100644
index 0000000..5762a01
--- /dev/null
+++ b/test/cases/measure/data/input/float_agg_min.yaml
@@ -0,0 +1,29 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+metadata:
+  name: "instance_clr_cpu_minute"
+  group: "sw_metric"
+tagProjection:
+  tagFamilies:
+  - name: "default"
+    tags: ["service_id", "entity_id"]
+fieldProjection:
+  names: ["summation", "count", "value"]
+agg:
+  function: "AGGREGATION_FUNCTION_MIN"
+  fieldName: "value"
\ No newline at end of file
diff --git a/test/cases/measure/data/testdata/instance_clr_cpu_minute_data.json b/test/cases/measure/data/testdata/instance_clr_cpu_minute_data.json
new file mode 100644
index 0000000..e767391
--- /dev/null
+++ b/test/cases/measure/data/testdata/instance_clr_cpu_minute_data.json
@@ -0,0 +1,107 @@
+[
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "1"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "float": {
+          "value": 18.1
+        }
+      },
+      {
+        "int": {
+          "value": 1
+        }
+      },
+      {
+        "float": {
+          "value": 18.1
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "4"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_2"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "float": {
+          "value": 100.4
+        }
+      },
+      {
+        "int": {
+          "value": 2
+        }
+      },
+      {
+        "float": {
+          "value": 50.2
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "5"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_2"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "float": {
+          "value": 100.0
+        }
+      },
+      {
+        "int": {
+          "value": 3
+        }
+      },
+      {
+        "float": {
+          "value": 33.333
+        }
+      }
+    ]
+  }
+]
diff --git a/test/cases/measure/data/want/float.yaml b/test/cases/measure/data/want/float.yaml
new file mode 100644
index 0000000..a0af118
--- /dev/null
+++ b/test/cases/measure/data/want/float.yaml
@@ -0,0 +1,93 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+dataPoints:
+- fields:
+  - name: summation
+    value:
+      float:
+        value: 18.1
+  - name: count
+    value:
+      int:
+        value: "1"
+  - name: value
+    value:
+      float:
+        value: 18.1
+  tagFamilies:
+  - name: default
+    tags:
+    - key: service_id
+      value:
+        str:
+          value: "1"
+    - key: entity_id
+      value:
+        str:
+          value: entity_1
+  timestamp: "2023-01-10T14:14:00Z"
+- fields:
+  - name: summation
+    value:
+      float:
+        value: 100.4
+  - name: count
+    value:
+      int:
+        value: "2"
+  - name: value
+    value:
+      float:
+        value: 50.2
+  tagFamilies:
+  - name: default
+    tags:
+    - key: service_id
+      value:
+        str:
+          value: "4"
+    - key: entity_id
+      value:
+        str:
+          value: entity_2
+  timestamp: "2023-01-10T14:15:00Z"
+- fields:
+  - name: summation
+    value:
+      float:
+        value: 100
+  - name: count
+    value:
+      int:
+        value: "3"
+  - name: value
+    value:
+      float:
+        value: 33.333
+  tagFamilies:
+  - name: default
+    tags:
+    - key: service_id
+      value:
+        str:
+          value: "5"
+    - key: entity_id
+      value:
+        str:
+          value: entity_2
+  timestamp: "2023-01-10T14:16:00Z"
\ No newline at end of file
diff --git a/test/cases/measure/data/want/float_agg_min.yaml b/test/cases/measure/data/want/float_agg_min.yaml
new file mode 100644
index 0000000..63ae130
--- /dev/null
+++ b/test/cases/measure/data/want/float_agg_min.yaml
@@ -0,0 +1,34 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+dataPoints:
+- fields:
+  - name: value
+    value:
+      float:
+        value: 18.1
+  tagFamilies:
+  - name: default
+    tags:
+    - key: service_id
+      value:
+        str:
+          value: "1"
+    - key: entity_id
+      value:
+        str:
+          value: entity_1
\ No newline at end of file
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index dfc5cee..4db0b05 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -61,4 +61,6 @@ var _ = g.DescribeTable("Scanning Measures", verify,
 	g.Entry("invalid logical expression", helpers.Args{Input: "err_invalid_le", Duration: 25 * time.Minute, Offset: -20 * time.Minute, WantErr: true}),
 	g.Entry("linked or expressions", helpers.Args{Input: "linked_or", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 	g.Entry("In and not In expressions", helpers.Args{Input: "in", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+	g.Entry("float64 value", helpers.Args{Input: "float", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+	g.Entry("float64 aggregation:min", helpers.Args{Input: "float_agg_min", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 )
diff --git a/test/integration/cold_query/query_suite_test.go b/test/integration/cold_query/query_suite_test.go
index 4c2e9f3..46f30c5 100644
--- a/test/integration/cold_query/query_suite_test.go
+++ b/test/integration/cold_query/query_suite_test.go
@@ -32,11 +32,11 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
 	"github.com/apache/skywalking-banyandb/pkg/test/setup"
 	"github.com/apache/skywalking-banyandb/pkg/timestamp"
-	casesMeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
-	casesMeasureData "github.com/apache/skywalking-banyandb/test/cases/measure/data"
-	casesStream "github.com/apache/skywalking-banyandb/test/cases/stream"
-	casesStreamData "github.com/apache/skywalking-banyandb/test/cases/stream/data"
-	casesTopn "github.com/apache/skywalking-banyandb/test/cases/topn"
+	casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
+	casesmeasureData "github.com/apache/skywalking-banyandb/test/cases/measure/data"
+	casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
+	casesstreamdata "github.com/apache/skywalking-banyandb/test/cases/stream/data"
+	casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
 )
 
 func TestIntegrationColdQuery(t *testing.T) {
@@ -65,27 +65,28 @@ var _ = SynchronizedBeforeSuite(func() []byte {
 	ns := timestamp.NowMilli().UnixNano()
 	now = time.Unix(0, ns-ns%int64(time.Minute)).Add(-time.Hour * 24)
 	interval := 500 * time.Millisecond
-	casesStreamData.Write(conn, "data.json", now, interval)
+	casesstreamdata.Write(conn, "data.json", now, interval)
 	interval = time.Minute
-	casesMeasureData.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval)
-	casesMeasureData.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
-	casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
-	casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(10*time.Second), interval)
-	casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
+	casesmeasureData.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval)
+	casesmeasureData.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
+	casesmeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
+	casesmeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(10*time.Second), interval)
+	casesmeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
+	casesmeasureData.Write(conn, "instance_clr_cpu_minute", "sw_metric", "instance_clr_cpu_minute_data.json", now, interval)
 	Expect(conn.Close()).To(Succeed())
 	return []byte(addr)
 }, func(address []byte) {
 	var err error
 	connection, err = grpchelper.Conn(string(address), 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials()))
-	casesStream.SharedContext = helpers.SharedContext{
+	casesstream.SharedContext = helpers.SharedContext{
 		Connection: connection,
 		BaseTime:   now,
 	}
-	casesMeasure.SharedContext = helpers.SharedContext{
+	casesmeasure.SharedContext = helpers.SharedContext{
 		Connection: connection,
 		BaseTime:   now,
 	}
-	casesTopn.SharedContext = helpers.SharedContext{
+	casestopn.SharedContext = helpers.SharedContext{
 		Connection: connection,
 		BaseTime:   now,
 	}
diff --git a/test/integration/query/query_suite_test.go b/test/integration/query/query_suite_test.go
index 5861a58..4b820e7 100644
--- a/test/integration/query/query_suite_test.go
+++ b/test/integration/query/query_suite_test.go
@@ -32,11 +32,11 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
 	"github.com/apache/skywalking-banyandb/pkg/test/setup"
 	"github.com/apache/skywalking-banyandb/pkg/timestamp"
-	cases_measure "github.com/apache/skywalking-banyandb/test/cases/measure"
-	cases_measure_data "github.com/apache/skywalking-banyandb/test/cases/measure/data"
-	cases_stream "github.com/apache/skywalking-banyandb/test/cases/stream"
-	cases_stream_data "github.com/apache/skywalking-banyandb/test/cases/stream/data"
-	cases_topn "github.com/apache/skywalking-banyandb/test/cases/topn"
+	casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
+	casesmeasuredata "github.com/apache/skywalking-banyandb/test/cases/measure/data"
+	casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
+	casesstreamdata "github.com/apache/skywalking-banyandb/test/cases/stream/data"
+	casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
 )
 
 func TestIntegrationQuery(t *testing.T) {
@@ -65,29 +65,30 @@ var _ = SynchronizedBeforeSuite(func() []byte {
 	now = time.Unix(0, ns-ns%int64(time.Minute))
 	interval := 500 * time.Millisecond
 	// stream
-	cases_stream_data.Write(conn, "data.json", now, interval)
+	casesstreamdata.Write(conn, "data.json", now, interval)
 	// measure
 	interval = time.Minute
-	cases_measure_data.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval)
-	cases_measure_data.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
-	cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
-	cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(10*time.Second), interval)
-	cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
+	casesmeasuredata.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval)
+	casesmeasuredata.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
+	casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
+	casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(10*time.Second), interval)
+	casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
+	casesmeasuredata.Write(conn, "instance_clr_cpu_minute", "sw_metric", "instance_clr_cpu_minute_data.json", now, interval)
 	Expect(conn.Close()).To(Succeed())
 	return []byte(addr)
 }, func(address []byte) {
 	var err error
 	connection, err = grpchelper.Conn(string(address), 10*time.Second,
 		grpc.WithTransportCredentials(insecure.NewCredentials()))
-	cases_stream.SharedContext = helpers.SharedContext{
+	casesstream.SharedContext = helpers.SharedContext{
 		Connection: connection,
 		BaseTime:   now,
 	}
-	cases_measure.SharedContext = helpers.SharedContext{
+	casesmeasure.SharedContext = helpers.SharedContext{
 		Connection: connection,
 		BaseTime:   now,
 	}
-	cases_topn.SharedContext = helpers.SharedContext{
+	casestopn.SharedContext = helpers.SharedContext{
 		Connection: connection,
 		BaseTime:   now,
 	}