You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2023/01/14 04:43:32 UTC
[skywalking-banyandb] branch main updated: Support 64-bit float field (#240)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 5308398 Support 64-bit float field (#240)
5308398 is described below
commit 530839850da6ce4e2735ededbf7b13fb9fa389ba
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sat Jan 14 12:43:26 2023 +0800
Support 64-bit float field (#240)
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
api/proto/banyandb/database/v1/schema.proto | 1 +
api/proto/banyandb/model/v1/common.proto | 5 +
banyand/measure/encode.go | 4 +-
banyand/measure/measure_suite_test.go | 4 +-
banyand/measure/measure_write.go | 2 +
banyand/measure/metadata_test.go | 6 +-
banyand/query/processor_topn.go | 4 +-
docs/api-reference.md | 18 +
pkg/convert/number.go | 13 +
pkg/encoding/{int.go => encoder.go} | 94 +++--
pkg/encoding/encoder_test.go | 392 +++++++++++++++++++++
pkg/encoding/int_test.go | 251 -------------
pkg/pb/v1/metadata.go | 6 +-
pkg/pb/v1/write.go | 9 +
pkg/query/aggregation/aggregation.go | 93 ++++-
pkg/query/aggregation/function.go | 85 ++---
.../logical/measure/measure_plan_aggregation.go | 144 +++++---
pkg/schema/metadata.go | 2 +
.../testdata/measures/instance_clr_cpu_minute.json | 50 +++
test/cases/measure/data/input/float.yaml | 26 ++
test/cases/measure/data/input/float_agg_min.yaml | 29 ++
.../testdata/instance_clr_cpu_minute_data.json | 107 ++++++
test/cases/measure/data/want/float.yaml | 93 +++++
test/cases/measure/data/want/float_agg_min.yaml | 34 ++
test/cases/measure/measure.go | 2 +
test/integration/cold_query/query_suite_test.go | 29 +-
test/integration/query/query_suite_test.go | 29 +-
27 files changed, 1074 insertions(+), 458 deletions(-)
diff --git a/api/proto/banyandb/database/v1/schema.proto b/api/proto/banyandb/database/v1/schema.proto
index 90555ad..929c77b 100644
--- a/api/proto/banyandb/database/v1/schema.proto
+++ b/api/proto/banyandb/database/v1/schema.proto
@@ -72,6 +72,7 @@ enum FieldType {
FIELD_TYPE_STRING = 1;
FIELD_TYPE_INT = 2;
FIELD_TYPE_DATA_BINARY = 3;
+ FIELD_TYPE_FLOAT = 4;
}
enum EncodingMethod {
diff --git a/api/proto/banyandb/model/v1/common.proto b/api/proto/banyandb/model/v1/common.proto
index d3cf5e6..806eb0a 100644
--- a/api/proto/banyandb/model/v1/common.proto
+++ b/api/proto/banyandb/model/v1/common.proto
@@ -36,6 +36,10 @@ message Int {
int64 value = 1;
}
+message Float {
+ double value = 1;
+}
+
message StrArray {
repeated string value = 1;
}
@@ -66,6 +70,7 @@ message FieldValue {
model.v1.Str str = 2;
model.v1.Int int = 3;
bytes binary_data = 4;
+ model.v1.Float float = 5;
}
}
diff --git a/banyand/measure/encode.go b/banyand/measure/encode.go
index d446828..452fbf9 100644
--- a/banyand/measure/encode.go
+++ b/banyand/measure/encode.go
@@ -45,7 +45,7 @@ type encoderPool struct {
func newEncoderPool(name string, size int, l *logger.Logger) encoding.SeriesEncoderPool {
return &encoderPool{
- intPool: encoding.NewIntEncoderPool(name, size, intervalFn),
+ intPool: encoding.NewEncoderPool(name, size, intervalFn),
l: l,
}
}
@@ -72,7 +72,7 @@ type decoderPool struct {
func newDecoderPool(name string, size int, l *logger.Logger) encoding.SeriesDecoderPool {
return &decoderPool{
- intPool: encoding.NewIntDecoderPool(name, size, intervalFn),
+ intPool: encoding.NewDecoderPool(name, size, intervalFn),
l: l,
}
}
diff --git a/banyand/measure/measure_suite_test.go b/banyand/measure/measure_suite_test.go
index 8e33370..05f53a8 100644
--- a/banyand/measure/measure_suite_test.go
+++ b/banyand/measure/measure_suite_test.go
@@ -75,8 +75,8 @@ func setUp() (*services, func()) {
repo := discovery.NewMockServiceRepo(ctrl)
repo.EXPECT().NodeID().AnyTimes()
// Both PreRun and Serve phases send events
- repo.EXPECT().Publish(event.MeasureTopicEntityEvent, test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).Times(2 * 8)
- repo.EXPECT().Publish(event.MeasureTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).Times(2 * 2)
+ repo.EXPECT().Publish(event.MeasureTopicEntityEvent, test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).AnyTimes()
+ repo.EXPECT().Publish(event.MeasureTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).AnyTimes()
// Init Pipeline
pipeline, err := queue.NewQueue(context.TODO(), repo)
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index a29c535..b91d5a0 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -177,6 +177,8 @@ func encodeFieldValue(fieldValue *modelv1.FieldValue) []byte {
switch fieldValue.GetValue().(type) {
case *modelv1.FieldValue_Int:
return convert.Int64ToBytes(fieldValue.GetInt().GetValue())
+ case *modelv1.FieldValue_Float:
+ return convert.Float64ToBytes(fieldValue.GetFloat().GetValue())
case *modelv1.FieldValue_Str:
return []byte(fieldValue.GetStr().Value)
case *modelv1.FieldValue_BinaryData:
diff --git a/banyand/measure/metadata_test.go b/banyand/measure/metadata_test.go
index 9236c72..66c83c0 100644
--- a/banyand/measure/metadata_test.go
+++ b/banyand/measure/metadata_test.go
@@ -65,8 +65,8 @@ var _ = Describe("Metadata", func() {
})
It("should add shards", func() {
- svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).Times(2)
- svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).Times(4)
+ svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).AnyTimes()
+ svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent, test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).AnyTimes()
groupSchema, err := svcs.metadataService.GroupRegistry().GetGroup(context.TODO(), "sw_metric")
Expect(err).ShouldNot(HaveOccurred())
Expect(groupSchema).ShouldNot(BeNil())
@@ -126,7 +126,7 @@ var _ = Describe("Metadata", func() {
})
It("should update a new measure", func() {
- svcs.repo.EXPECT().Publish(event.MeasureTopicEntityEvent, test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).Times(1)
+ svcs.repo.EXPECT().Publish(event.MeasureTopicEntityEvent, test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).AnyTimes()
// Remove the first tag from the entity
measureSchema.Entity.TagNames = measureSchema.Entity.TagNames[1:]
entitySize := len(measureSchema.Entity.TagNames)
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index 59abc23..e72520a 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -236,7 +236,7 @@ func (t *topNQueryProcessor) scanSeries(series tsdb.Series, request *measurev1.T
var _ heap.Interface = (*postAggregationProcessor)(nil)
type aggregatorItem struct {
- int64Func aggregation.Int64Func
+ int64Func aggregation.Func[int64]
key string
index int
}
@@ -324,7 +324,7 @@ func (aggr *postAggregationProcessor) put(key string, val int64, timestampMillis
return nil
}
- aggrFunc, err := aggregation.NewInt64Func(aggr.aggrFunc)
+ aggrFunc, err := aggregation.NewFunc[int64](aggr.aggrFunc)
if err != nil {
return err
}
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 2c5839b..e969269 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -25,6 +25,7 @@
- [banyandb/model/v1/common.proto](#banyandb_model_v1_common-proto)
- [FieldValue](#banyandb-model-v1-FieldValue)
+ - [Float](#banyandb-model-v1-Float)
- [ID](#banyandb-model-v1-ID)
- [Int](#banyandb-model-v1-Int)
- [IntArray](#banyandb-model-v1-IntArray)
@@ -479,6 +480,22 @@ Metadata is for multi-tenant, multi-model use
| str | [Str](#banyandb-model-v1-Str) | | |
| int | [Int](#banyandb-model-v1-Int) | | |
| binary_data | [bytes](#bytes) | | |
+| float | [Float](#banyandb-model-v1-Float) | | |
+
+
+
+
+
+
+<a name="banyandb-model-v1-Float"></a>
+
+### Float
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| value | [double](#double) | | |
@@ -1073,6 +1090,7 @@ TopNAggregation generates offline TopN statistics for a measure's TopN appro
| FIELD_TYPE_STRING | 1 | |
| FIELD_TYPE_INT | 2 | |
| FIELD_TYPE_DATA_BINARY | 3 | |
+| FIELD_TYPE_FLOAT | 4 | |
diff --git a/pkg/convert/number.go b/pkg/convert/number.go
index f94c252..0074c58 100644
--- a/pkg/convert/number.go
+++ b/pkg/convert/number.go
@@ -19,6 +19,7 @@ package convert
import (
"encoding/binary"
+ "math"
)
// Uint64ToBytes converts uint64 to bytes.
@@ -74,3 +75,15 @@ func BytesToUint64(b []byte) uint64 {
func BytesToUint32(b []byte) uint32 {
return binary.BigEndian.Uint32(b)
}
+
+// Float64ToBytes converts float64 to byes.
+func Float64ToBytes(f float64) []byte {
+ bs := make([]byte, 8)
+ binary.BigEndian.PutUint64(bs, math.Float64bits(f))
+ return bs
+}
+
+// BytesToFloat64 converts bytes to float64.
+func BytesToFloat64(b []byte) float64 {
+ return math.Float64frombits(binary.BigEndian.Uint64(b))
+}
diff --git a/pkg/encoding/int.go b/pkg/encoding/encoder.go
similarity index 68%
rename from pkg/encoding/int.go
rename to pkg/encoding/encoder.go
index ed81b72..f5c4838 100644
--- a/pkg/encoding/int.go
+++ b/pkg/encoding/encoder.go
@@ -20,46 +20,48 @@ package encoding
import (
"bytes"
"encoding/binary"
- "errors"
"io"
"sync"
"time"
+ "github.com/pkg/errors"
+
"github.com/apache/skywalking-banyandb/pkg/convert"
)
var (
- intEncoderPool = sync.Pool{
- New: newIntEncoder,
+ encoderPool = sync.Pool{
+ New: newEncoder,
}
- intDecoderPool = sync.Pool{
+ decoderPool = sync.Pool{
New: func() interface{} {
- return &intDecoder{}
+ return &decoder{}
},
}
errInvalidValue = errors.New("invalid encoded value")
+ errNoData = errors.New("there is no data")
)
-type intEncoderPoolDelegator struct {
+type encoderPoolDelegator struct {
pool *sync.Pool
fn ParseInterval
name string
size int
}
-// NewIntEncoderPool returns a SeriesEncoderPool which provides int-based xor encoders.
-func NewIntEncoderPool(name string, size int, fn ParseInterval) SeriesEncoderPool {
- return &intEncoderPoolDelegator{
+// NewEncoderPool returns a SeriesEncoderPool which provides int-based xor encoders.
+func NewEncoderPool(name string, size int, fn ParseInterval) SeriesEncoderPool {
+ return &encoderPoolDelegator{
name: name,
- pool: &intEncoderPool,
+ pool: &encoderPool,
size: size,
fn: fn,
}
}
-func (b *intEncoderPoolDelegator) Get(metadata []byte, buffer BufferWriter) SeriesEncoder {
- encoder := b.pool.Get().(*intEncoder)
+func (b *encoderPoolDelegator) Get(metadata []byte, buffer BufferWriter) SeriesEncoder {
+ encoder := b.pool.Get().(*encoder)
encoder.name = b.name
encoder.size = b.size
encoder.fn = b.fn
@@ -67,51 +69,51 @@ func (b *intEncoderPoolDelegator) Get(metadata []byte, buffer BufferWriter) Seri
return encoder
}
-func (b *intEncoderPoolDelegator) Put(encoder SeriesEncoder) {
- _, ok := encoder.(*intEncoder)
+func (b *encoderPoolDelegator) Put(seriesEncoder SeriesEncoder) {
+ _, ok := seriesEncoder.(*encoder)
if ok {
- b.pool.Put(encoder)
+ b.pool.Put(seriesEncoder)
}
}
-type intDecoderPoolDelegator struct {
+type decoderPoolDelegator struct {
pool *sync.Pool
fn ParseInterval
name string
size int
}
-// NewIntDecoderPool returns a SeriesDecoderPool which provides int-based xor decoders.
-func NewIntDecoderPool(name string, size int, fn ParseInterval) SeriesDecoderPool {
- return &intDecoderPoolDelegator{
+// NewDecoderPool returns a SeriesDecoderPool which provides int-based xor decoders.
+func NewDecoderPool(name string, size int, fn ParseInterval) SeriesDecoderPool {
+ return &decoderPoolDelegator{
name: name,
- pool: &intDecoderPool,
+ pool: &decoderPool,
size: size,
fn: fn,
}
}
-func (b *intDecoderPoolDelegator) Get(_ []byte) SeriesDecoder {
- decoder := b.pool.Get().(*intDecoder)
+func (b *decoderPoolDelegator) Get(_ []byte) SeriesDecoder {
+ decoder := b.pool.Get().(*decoder)
decoder.name = b.name
decoder.size = b.size
decoder.fn = b.fn
return decoder
}
-func (b *intDecoderPoolDelegator) Put(decoder SeriesDecoder) {
- _, ok := decoder.(*intDecoder)
+func (b *decoderPoolDelegator) Put(seriesDecoder SeriesDecoder) {
+ _, ok := seriesDecoder.(*decoder)
if ok {
- b.pool.Put(decoder)
+ b.pool.Put(seriesDecoder)
}
}
-var _ SeriesEncoder = (*intEncoder)(nil)
+var _ SeriesEncoder = (*encoder)(nil)
// ParseInterval parses the interval rule from the key in a kv pair.
type ParseInterval = func(key []byte) time.Duration
-type intEncoder struct {
+type encoder struct {
buff BufferWriter
bw *Writer
values *XOREncoder
@@ -124,15 +126,15 @@ type intEncoder struct {
size int
}
-func newIntEncoder() interface{} {
+func newEncoder() interface{} {
bw := NewWriter()
- return &intEncoder{
+ return &encoder{
bw: bw,
values: NewXOREncoder(bw),
}
}
-func (ie *intEncoder) Append(ts uint64, value []byte) {
+func (ie *encoder) Append(ts uint64, value []byte) {
if len(value) > 8 {
return
}
@@ -160,11 +162,11 @@ func (ie *intEncoder) Append(ts uint64, value []byte) {
rawSize.WithLabelValues(ie.name, "int").Add(float64(l + 8))
}
-func (ie *intEncoder) IsFull() bool {
+func (ie *encoder) IsFull() bool {
return ie.num >= ie.size
}
-func (ie *intEncoder) Reset(key []byte, buffer BufferWriter) {
+func (ie *encoder) Reset(key []byte, buffer BufferWriter) {
ie.buff = buffer
ie.bw.Reset(buffer)
ie.interval = ie.fn(key)
@@ -174,7 +176,7 @@ func (ie *intEncoder) Reset(key []byte, buffer BufferWriter) {
ie.values = NewXOREncoder(ie.bw)
}
-func (ie *intEncoder) Encode() error {
+func (ie *encoder) Encode() error {
ie.bw.Flush()
buffWriter := NewPacker(ie.buff)
buffWriter.PutUint64(ie.startTime)
@@ -184,13 +186,13 @@ func (ie *intEncoder) Encode() error {
return nil
}
-func (ie *intEncoder) StartTime() uint64 {
+func (ie *encoder) StartTime() uint64 {
return ie.startTime
}
-var _ SeriesDecoder = (*intDecoder)(nil)
+var _ SeriesDecoder = (*decoder)(nil)
-type intDecoder struct {
+type decoder struct {
fn ParseInterval
name string
area []byte
@@ -200,7 +202,7 @@ type intDecoder struct {
num int
}
-func (i *intDecoder) Decode(key, data []byte) error {
+func (i *decoder) Decode(key, data []byte) error {
if len(data) < 10 {
return errInvalidValue
}
@@ -211,28 +213,28 @@ func (i *intDecoder) Decode(key, data []byte) error {
return nil
}
-func (i intDecoder) Len() int {
+func (i decoder) Len() int {
return i.num
}
-func (i intDecoder) IsFull() bool {
+func (i decoder) IsFull() bool {
return i.num >= i.size
}
-func (i intDecoder) Get(ts uint64) ([]byte, error) {
+func (i decoder) Get(ts uint64) ([]byte, error) {
for iter := i.Iterator(); iter.Next(); {
if iter.Time() == ts {
return iter.Val(), nil
}
}
- return zeroBytes, nil
+ return nil, errors.WithMessagef(errNoData, "ts:%d", ts)
}
-func (i intDecoder) Range() (start, end uint64) {
+func (i decoder) Range() (start, end uint64) {
return i.startTime, i.startTime + uint64(i.num-1)*uint64(i.interval)
}
-func (i intDecoder) Iterator() SeriesIterator {
+func (i decoder) Iterator() SeriesIterator {
br := NewReader(bytes.NewReader(i.area))
return &intIterator{
endTime: i.startTime + uint64(i.num*int(i.interval)),
@@ -243,11 +245,7 @@ func (i intDecoder) Iterator() SeriesIterator {
}
}
-var (
- _ SeriesIterator = (*intIterator)(nil)
- zeroBytes = convert.Uint64ToBytes(zero)
- zero = convert.BytesToUint64(convert.Int64ToBytes(0))
-)
+var _ SeriesIterator = (*intIterator)(nil)
type intIterator struct {
err error
diff --git a/pkg/encoding/encoder_test.go b/pkg/encoding/encoder_test.go
new file mode 100644
index 0000000..840a883
--- /dev/null
+++ b/pkg/encoding/encoder_test.go
@@ -0,0 +1,392 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+ "bytes"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+func TestNewEncoderAndDecoder(t *testing.T) {
+ type tsData struct {
+ ts []uint64
+ data []any
+ start uint64
+ end uint64
+ }
+ tests := []struct {
+ name string
+ args tsData
+ want tsData
+ }{
+ {
+ name: "int golden path",
+ args: tsData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+ data: []any{7, 8, 7, 9},
+ },
+ want: tsData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+ data: []any{7, 8, 7, 9},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
+ },
+ },
+ {
+ name: "int more than the size",
+ args: tsData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)},
+ data: []any{7, 8, 7, 9, 6},
+ },
+ want: tsData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+ data: []any{7, 8, 7, 9},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
+ },
+ },
+ {
+ name: "int less than the size",
+ args: tsData{
+ ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+ data: []any{7, 8, 7},
+ },
+ want: tsData{
+ ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+ data: []any{7, 8, 7},
+ start: uint64(time.Minute),
+ end: uint64(3 * time.Minute),
+ },
+ },
+ {
+ name: "int empty slot in the middle",
+ args: tsData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(time.Minute)},
+ data: []any{7, 9},
+ },
+ want: tsData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(1 * time.Minute)},
+ data: []any{7, 9},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
+ },
+ },
+ {
+ name: "float64 golden path",
+ args: tsData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+ data: []any{7.0, 8.0, 7.0, 9.0},
+ },
+ want: tsData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+ data: []any{7.0, 8.0, 7.0, 9.0},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
+ },
+ },
+ {
+ name: "float64 more than the size",
+ args: tsData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)},
+ data: []any{0.7, 0.8, 0.7, 0.9, 0.6},
+ },
+ want: tsData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+ data: []any{0.7, 0.8, 0.7, 0.9},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
+ },
+ },
+ {
+ name: "float64 less than the size",
+ args: tsData{
+ ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+ data: []any{1.7, 1.8, 1.7},
+ },
+ want: tsData{
+ ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+ data: []any{1.7, 1.8, 1.7},
+ start: uint64(time.Minute),
+ end: uint64(3 * time.Minute),
+ },
+ },
+ {
+ name: "float64 empty slot in the middle",
+ args: tsData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(time.Minute)},
+ data: []any{0.700033, 0.988822},
+ },
+ want: tsData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(1 * time.Minute)},
+ data: []any{0.700033, 0.988822},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
+ },
+ },
+ }
+ key := []byte("foo")
+ fn := func(k []byte) time.Duration {
+ assert.Equal(t, key, k)
+ return 1 * time.Minute
+ }
+ encoderPool := NewEncoderPool("minute", 4, fn)
+ decoderPool := NewDecoderPool("minute", 4, fn)
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ at := assert.New(t)
+ var buffer bytes.Buffer
+ encoder := encoderPool.Get(key, &buffer)
+ defer encoderPool.Put(encoder)
+ decoder := decoderPool.Get(key)
+ defer decoderPool.Put(decoder)
+ isFull := false
+ for i, v := range tt.args.ts {
+ encoder.Append(v, ToBytes(tt.args.data[i]))
+ if encoder.IsFull() {
+ isFull = true
+ break
+ }
+ }
+ err := encoder.Encode()
+ at.NoError(err)
+
+ at.Equal(tt.want.start, encoder.StartTime())
+ at.NoError(decoder.Decode(key, buffer.Bytes()))
+ start, end := decoder.Range()
+ at.Equal(tt.want.start, start)
+ at.Equal(tt.want.end, end)
+ if isFull {
+ at.True(decoder.IsFull())
+ }
+ i := 0
+ for iter := decoder.Iterator(); iter.Next(); i++ {
+ at.NoError(iter.Error())
+ at.Equal(tt.want.ts[i], iter.Time())
+ at.Equal(tt.want.data[i], BytesTo(tt.want.data[i], iter.Val()))
+ v, err := decoder.Get(iter.Time())
+ at.NoError(err)
+ at.Equal(tt.want.data[i], BytesTo(tt.want.data[i], v))
+ }
+ at.Equal(len(tt.want.ts), i)
+ })
+ }
+}
+
+func ToBytes(v any) []byte {
+ switch d := v.(type) {
+ case int:
+ return convert.Int64ToBytes(int64(d))
+ case float64:
+ return convert.Float64ToBytes(d)
+ }
+ return nil
+}
+
+func BytesTo(t any, b []byte) any {
+ switch t.(type) {
+ case int:
+ return int(convert.BytesToInt64(b))
+ case float64:
+ return convert.BytesToFloat64(b)
+ }
+ return nil
+}
+
+func TestNewDecoderGet(t *testing.T) {
+ type tsData struct {
+ ts []uint64
+ data []any
+ }
+ type wantData struct {
+ ts []uint64
+ data []any
+ wantErr []bool
+ start uint64
+ end uint64
+ }
+ tests := []struct {
+ name string
+ args tsData
+ want wantData
+ }{
+ {
+ name: "int golden path",
+ args: tsData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+ data: []any{7, 8, 7, 9},
+ },
+ want: wantData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+ data: []any{7, 8, 7, 9},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
+ },
+ },
+ {
+ name: "int more than the size",
+ args: tsData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)},
+ data: []any{7, 8, 7, 9, 6},
+ },
+ want: wantData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 0},
+ data: []any{7, 8, 7, 9, nil},
+ wantErr: []bool{false, false, false, false, true},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
+ },
+ },
+ {
+ name: "int less than the size",
+ args: tsData{
+ ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+ data: []any{7, 8, 7},
+ },
+ want: wantData{
+ ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+ data: []any{7, 8, 7},
+ start: uint64(time.Minute),
+ end: uint64(3 * time.Minute),
+ },
+ },
+ {
+ name: "int empty slot in the middle",
+ args: tsData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(time.Minute)},
+ data: []any{7, 9},
+ },
+ want: wantData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+ data: []any{7, nil, nil, 9},
+ wantErr: []bool{false, true, true, false},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
+ },
+ },
+ {
+ name: "float golden path",
+ args: tsData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+ data: []any{7.0, 8.0, 7.0, 9.0},
+ },
+ want: wantData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+ data: []any{7.0, 8.0, 7.0, 9.0},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
+ },
+ },
+ {
+ name: "float more than the size",
+ args: tsData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)},
+ data: []any{1.7, 1.8, 1.7, 1.9, 1.6},
+ },
+ want: wantData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 0},
+ data: []any{1.7, 1.8, 1.7, 1.9, nil},
+ wantErr: []bool{false, false, false, false, true},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
+ },
+ },
+ {
+ name: "float less than the size",
+ args: tsData{
+ ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+ data: []any{0.71, 0.833, 0.709},
+ },
+ want: wantData{
+ ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
+ data: []any{0.71, 0.833, 0.709},
+ start: uint64(time.Minute),
+ end: uint64(3 * time.Minute),
+ },
+ },
+ {
+ name: "float empty slot in the middle",
+ args: tsData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(time.Minute)},
+ data: []any{1.7, 1.9},
+ },
+ want: wantData{
+ ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
+ data: []any{1.7, nil, nil, 1.9},
+ wantErr: []bool{false, true, true, false},
+ start: uint64(time.Minute),
+ end: uint64(4 * time.Minute),
+ },
+ },
+ }
+ key := []byte("foo")
+ fn := func(k []byte) time.Duration {
+ assert.Equal(t, key, k)
+ return 1 * time.Minute
+ }
+ encoderPool := NewEncoderPool("minute", 4, fn)
+ decoderPool := NewDecoderPool("minute", 4, fn)
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ at := assert.New(t)
+ var buffer bytes.Buffer
+ encoder := encoderPool.Get(key, &buffer)
+ defer encoderPool.Put(encoder)
+ decoder := decoderPool.Get(key)
+ defer decoderPool.Put(decoder)
+ isFull := false
+ for i, v := range tt.args.ts {
+ encoder.Append(v, ToBytes(tt.args.data[i]))
+ if encoder.IsFull() {
+ isFull = true
+ break
+ }
+ }
+ err := encoder.Encode()
+ at.NoError(err)
+
+ at.Equal(tt.want.start, encoder.StartTime())
+ at.NoError(decoder.Decode(key, buffer.Bytes()))
+ start, end := decoder.Range()
+ at.Equal(tt.want.start, start)
+ at.Equal(tt.want.end, end)
+ if isFull {
+ at.True(decoder.IsFull())
+ }
+ for i, t := range tt.want.ts {
+ wantErr := false
+ if tt.want.wantErr != nil {
+ wantErr = tt.want.wantErr[i]
+ }
+ v, err := decoder.Get(t)
+ if wantErr {
+ at.ErrorIs(err, errNoData)
+ } else {
+ at.NoError(err)
+ at.Equal(tt.want.data[i], BytesTo(tt.want.data[i], v))
+ }
+ }
+ })
+ }
+}
diff --git a/pkg/encoding/int_test.go b/pkg/encoding/int_test.go
deleted file mode 100644
index 82767ea..0000000
--- a/pkg/encoding/int_test.go
+++ /dev/null
@@ -1,251 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package encoding
-
-import (
- "bytes"
- "testing"
- "time"
-
- "github.com/stretchr/testify/assert"
-
- "github.com/apache/skywalking-banyandb/pkg/convert"
-)
-
-func TestNewIntEncoderAndDecoder(t *testing.T) {
- type tsData struct {
- ts []uint64
- data []int64
- start uint64
- end uint64
- }
- tests := []struct {
- name string
- args tsData
- want tsData
- }{
- {
- name: "golden path",
- args: tsData{
- ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
- data: []int64{7, 8, 7, 9},
- },
- want: tsData{
- ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
- data: []int64{7, 8, 7, 9},
- start: uint64(time.Minute),
- end: uint64(4 * time.Minute),
- },
- },
- {
- name: "more than the size",
- args: tsData{
- ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)},
- data: []int64{7, 8, 7, 9, 6},
- },
- want: tsData{
- ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
- data: []int64{7, 8, 7, 9},
- start: uint64(time.Minute),
- end: uint64(4 * time.Minute),
- },
- },
- {
- name: "less than the size",
- args: tsData{
- ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
- data: []int64{7, 8, 7},
- },
- want: tsData{
- ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
- data: []int64{7, 8, 7},
- start: uint64(time.Minute),
- end: uint64(3 * time.Minute),
- },
- },
- {
- name: "empty slot in the middle",
- args: tsData{
- ts: []uint64{uint64(4 * time.Minute), uint64(time.Minute)},
- data: []int64{7, 9},
- },
- want: tsData{
- ts: []uint64{uint64(4 * time.Minute), uint64(1 * time.Minute)},
- data: []int64{7, 9},
- start: uint64(time.Minute),
- end: uint64(4 * time.Minute),
- },
- },
- }
- key := []byte("foo")
- fn := func(k []byte) time.Duration {
- assert.Equal(t, key, k)
- return 1 * time.Minute
- }
- encoderPool := NewIntEncoderPool("minute", 4, fn)
- decoderPool := NewIntDecoderPool("minute", 4, fn)
-
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- at := assert.New(t)
- var buffer bytes.Buffer
- encoder := encoderPool.Get(key, &buffer)
- defer encoderPool.Put(encoder)
- decoder := decoderPool.Get(key)
- defer decoderPool.Put(decoder)
- isFull := false
- for i, v := range tt.args.ts {
- encoder.Append(v, convert.Int64ToBytes(tt.args.data[i]))
- if encoder.IsFull() {
- isFull = true
- break
- }
- }
- err := encoder.Encode()
- at.NoError(err)
-
- at.Equal(tt.want.start, encoder.StartTime())
- at.NoError(decoder.Decode(key, buffer.Bytes()))
- start, end := decoder.Range()
- at.Equal(tt.want.start, start)
- at.Equal(tt.want.end, end)
- if isFull {
- at.True(decoder.IsFull())
- }
- i := 0
- for iter := decoder.Iterator(); iter.Next(); i++ {
- at.NoError(iter.Error())
- at.Equal(tt.want.ts[i], iter.Time())
- at.Equal(tt.want.data[i], convert.BytesToInt64(iter.Val()))
- v, err := decoder.Get(iter.Time())
- at.NoError(err)
- at.Equal(tt.want.data[i], convert.BytesToInt64(v))
- }
- at.Equal(len(tt.want.ts), i)
- })
- }
-}
-
-func TestNewIntDecoderGet(t *testing.T) {
- type tsData struct {
- ts []uint64
- data []int64
- start uint64
- end uint64
- }
- tests := []struct {
- name string
- args tsData
- want tsData
- }{
- {
- name: "golden path",
- args: tsData{
- ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
- data: []int64{7, 8, 7, 9},
- },
- want: tsData{
- ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
- data: []int64{7, 8, 7, 9},
- start: uint64(time.Minute),
- end: uint64(4 * time.Minute),
- },
- },
- {
- name: "more than the size",
- args: tsData{
- ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)},
- data: []int64{7, 8, 7, 9, 6},
- },
- want: tsData{
- ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 0},
- data: []int64{7, 8, 7, 9, 0},
- start: uint64(time.Minute),
- end: uint64(4 * time.Minute),
- },
- },
- {
- name: "less than the size",
- args: tsData{
- ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
- data: []int64{7, 8, 7},
- },
- want: tsData{
- ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)},
- data: []int64{7, 8, 7},
- start: uint64(time.Minute),
- end: uint64(3 * time.Minute),
- },
- },
- {
- name: "empty slot in the middle",
- args: tsData{
- ts: []uint64{uint64(4 * time.Minute), uint64(time.Minute)},
- data: []int64{7, 9},
- },
- want: tsData{
- ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
- data: []int64{7, 0, 0, 9},
- start: uint64(time.Minute),
- end: uint64(4 * time.Minute),
- },
- },
- }
- key := []byte("foo")
- fn := func(k []byte) time.Duration {
- assert.Equal(t, key, k)
- return 1 * time.Minute
- }
- encoderPool := NewIntEncoderPool("minute", 4, fn)
- decoderPool := NewIntDecoderPool("minute", 4, fn)
-
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- at := assert.New(t)
- var buffer bytes.Buffer
- encoder := encoderPool.Get(key, &buffer)
- defer encoderPool.Put(encoder)
- decoder := decoderPool.Get(key)
- defer decoderPool.Put(decoder)
- isFull := false
- for i, v := range tt.args.ts {
- encoder.Append(v, convert.Int64ToBytes(tt.args.data[i]))
- if encoder.IsFull() {
- isFull = true
- break
- }
- }
- err := encoder.Encode()
- at.NoError(err)
-
- at.Equal(tt.want.start, encoder.StartTime())
- at.NoError(decoder.Decode(key, buffer.Bytes()))
- start, end := decoder.Range()
- at.Equal(tt.want.start, start)
- at.Equal(tt.want.end, end)
- if isFull {
- at.True(decoder.IsFull())
- }
- for i, t := range tt.want.ts {
- v, err := decoder.Get(t)
- at.NoError(err)
- at.Equal(tt.want.data[i], convert.BytesToInt64(v))
- }
- })
- }
-}
diff --git a/pkg/pb/v1/metadata.go b/pkg/pb/v1/metadata.go
index 7de4753..373c0e4 100644
--- a/pkg/pb/v1/metadata.go
+++ b/pkg/pb/v1/metadata.go
@@ -57,10 +57,12 @@ func tagValueTypeConv(tagValue *modelv1.TagValue) (tagType databasev1.TagType, i
}
// FieldValueTypeConv recognizes the field type from its value.
-func FieldValueTypeConv(tagValue *modelv1.FieldValue) (tagType databasev1.FieldType, isNull bool) {
- switch tagValue.GetValue().(type) {
+func FieldValueTypeConv(fieldValue *modelv1.FieldValue) (tagType databasev1.FieldType, isNull bool) {
+ switch fieldValue.GetValue().(type) {
case *modelv1.FieldValue_Int:
return databasev1.FieldType_FIELD_TYPE_INT, false
+ case *modelv1.FieldValue_Float:
+ return databasev1.FieldType_FIELD_TYPE_FLOAT, false
case *modelv1.FieldValue_Str:
return databasev1.FieldType_FIELD_TYPE_STRING, false
case *modelv1.FieldValue_BinaryData:
diff --git a/pkg/pb/v1/write.go b/pkg/pb/v1/write.go
index 03c470b..600df45 100644
--- a/pkg/pb/v1/write.go
+++ b/pkg/pb/v1/write.go
@@ -190,6 +190,15 @@ func DecodeFieldValue(fieldValue []byte, fieldSpec *databasev1.FieldSpec) (*mode
hex.EncodeToString(fieldValue), len(fieldValue))
}
return &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: convert.BytesToInt64(fieldValue)}}}, nil
+ case databasev1.FieldType_FIELD_TYPE_FLOAT:
+ if len(fieldValue) == 0 {
+ return zeroFieldValue, nil
+ }
+ if len(fieldValue) != 8 {
+ return nil, errors.WithMessagef(errMalformedField, "the length of encoded field value(float64) %s is %d, less than 8",
+ hex.EncodeToString(fieldValue), len(fieldValue))
+ }
+ return &modelv1.FieldValue{Value: &modelv1.FieldValue_Float{Float: &modelv1.Float{Value: convert.BytesToFloat64(fieldValue)}}}, nil
case databasev1.FieldType_FIELD_TYPE_DATA_BINARY:
return &modelv1.FieldValue{Value: &modelv1.FieldValue_BinaryData{BinaryData: fieldValue}}, nil
}
diff --git a/pkg/query/aggregation/aggregation.go b/pkg/query/aggregation/aggregation.go
index a0836f9..a581e67 100644
--- a/pkg/query/aggregation/aggregation.go
+++ b/pkg/query/aggregation/aggregation.go
@@ -26,32 +26,91 @@ import (
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
)
-var errUnknownFunc = errors.New("unknown aggregation function")
+var (
+ errUnknownFunc = errors.New("unknown aggregation function")
+ errUnSupportedFieldType = errors.New("unsupported field type")
+)
-// Int64Func allows to aggregate int64.
-type Int64Func interface {
- In(int64)
- Val() int64
+// Func supports aggregation operations.
+type Func[N Number] interface {
+ In(N)
+ Val() N
Reset()
}
-// NewInt64Func returns a Int64Func based on function type.
-func NewInt64Func(af modelv1.AggregationFunction) (Int64Func, error) {
+// Number denotes the supported number types.
+type Number interface {
+ ~int64 | ~float64
+}
+
+// NewFunc returns a aggregation function based on function type.
+func NewFunc[N Number](af modelv1.AggregationFunction) (Func[N], error) {
+ var result Func[N]
switch af {
case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN:
- return &meanInt64Func{}, nil
+ result = &meanFunc[N]{zero: zero[N]()}
case modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT:
- return &countInt64Func{}, nil
+ result = &countFunc[N]{zero: zero[N]()}
case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX:
- return &maxInt64Func{
- val: math.MinInt64,
- }, nil
+ result = &maxFunc[N]{min: minOf[N]()}
case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN:
- return &minInt64Func{
- val: math.MaxInt64,
- }, nil
+ result = &minFunc[N]{max: maxOf[N]()}
case modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM:
- return &sumInt64Func{}, nil
+ result = &sumFunc[N]{zero: zero[N]()}
+ default:
+ return nil, errors.WithMessagef(errUnknownFunc, "unknown function:%s", modelv1.AggregationFunction_name[int32(af)])
+ }
+ result.Reset()
+ return result, nil
+}
+
+// FromFieldValue transforms modelv1.FieldValue to Number.
+func FromFieldValue[N Number](fieldValue *modelv1.FieldValue) (N, error) {
+ switch fieldValue.GetValue().(type) {
+ case *modelv1.FieldValue_Int:
+ return N(fieldValue.GetInt().Value), nil
+ case *modelv1.FieldValue_Float:
+ return N(fieldValue.GetFloat().Value), nil
+ }
+ return zero[N](), errUnSupportedFieldType
+}
+
+// ToFieldValue transforms Number to modelv1.FieldValue.
+func ToFieldValue[N Number](value N) (*modelv1.FieldValue, error) {
+ switch any(value).(type) {
+ case int64:
+ return &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: int64(value)}}}, nil
+ case float64:
+ return &modelv1.FieldValue{Value: &modelv1.FieldValue_Float{Float: &modelv1.Float{Value: float64(value)}}}, nil
}
- return nil, errUnknownFunc
+ return nil, errUnSupportedFieldType
+}
+
+func minOf[N Number]() (r N) {
+ switch x := any(&r).(type) {
+ case *int64:
+ *x = math.MinInt64
+ case *float64:
+ *x = -math.MaxFloat64
+ default:
+ panic("unreachable")
+ }
+ return
+}
+
+func maxOf[N Number]() (r N) {
+ switch x := any(&r).(type) {
+ case *int64:
+ *x = math.MaxInt64
+ case *float64:
+ *x = math.MaxFloat64
+ default:
+ panic("unreachable")
+ }
+ return
+}
+
+func zero[N Number]() N {
+ var z N
+ return z
}
diff --git a/pkg/query/aggregation/function.go b/pkg/query/aggregation/function.go
index 59bdbc8..faa019d 100644
--- a/pkg/query/aggregation/function.go
+++ b/pkg/query/aggregation/function.go
@@ -17,23 +17,20 @@
package aggregation
-import "math"
-
-var _ Int64Func = (*meanInt64Func)(nil)
-
-type meanInt64Func struct {
- sum int64
- count int64
+type meanFunc[N Number] struct {
+ sum N
+ count N
+ zero N
}
-func (m *meanInt64Func) In(val int64) {
+func (m *meanFunc[N]) In(val N) {
m.sum += val
m.count++
}
-func (m meanInt64Func) Val() int64 {
- if m.count == 0 {
- return 0
+func (m meanFunc[N]) Val() N {
+ if m.count == m.zero {
+ return m.zero
}
v := m.sum / m.count
if v < 1 {
@@ -42,83 +39,79 @@ func (m meanInt64Func) Val() int64 {
return v
}
-func (m *meanInt64Func) Reset() {
- m.sum = 0
- m.count = 0
+func (m *meanFunc[N]) Reset() {
+ m.sum = m.zero
+ m.count = m.zero
}
-var _ Int64Func = (*countInt64Func)(nil)
-
-type countInt64Func struct {
- count int64
+type countFunc[N Number] struct {
+ count N
+ zero N
}
-func (c *countInt64Func) In(_ int64) {
+func (c *countFunc[N]) In(_ N) {
c.count++
}
-func (c countInt64Func) Val() int64 {
+func (c countFunc[N]) Val() N {
return c.count
}
-func (c *countInt64Func) Reset() {
- c.count = 0
+func (c *countFunc[N]) Reset() {
+ c.count = c.zero
}
-var _ Int64Func = (*sumInt64Func)(nil)
-
-type sumInt64Func struct {
- sum int64
+type sumFunc[N Number] struct {
+ sum N
+ zero N
}
-func (s *sumInt64Func) In(val int64) {
+func (s *sumFunc[N]) In(val N) {
s.sum += val
}
-func (s sumInt64Func) Val() int64 {
+func (s sumFunc[N]) Val() N {
return s.sum
}
-func (s *sumInt64Func) Reset() {
- s.sum = 0
+func (s *sumFunc[N]) Reset() {
+ s.sum = s.zero
}
-var _ Int64Func = (*maxInt64Func)(nil)
-
-type maxInt64Func struct {
- val int64
+type maxFunc[N Number] struct {
+ val N
+ min N
}
-func (m *maxInt64Func) In(val int64) {
+func (m *maxFunc[N]) In(val N) {
if val > m.val {
m.val = val
}
}
-func (m maxInt64Func) Val() int64 {
+func (m maxFunc[N]) Val() N {
return m.val
}
-func (m *maxInt64Func) Reset() {
- m.val = math.MinInt64
+func (m *maxFunc[N]) Reset() {
+ m.val = m.min
}
-var _ Int64Func = (*minInt64Func)(nil)
-
-type minInt64Func struct {
- val int64
+type minFunc[N Number] struct {
+ val N
+ max N
}
-func (m *minInt64Func) In(val int64) {
+func (m *minFunc[N]) In(val N) {
if val < m.val {
m.val = val
}
}
-func (m minInt64Func) Val() int64 {
+func (m minFunc[N]) Val() N {
return m.val
}
-func (m *minInt64Func) Reset() {
- m.val = math.MaxInt64
+func (m *minFunc[N]) Reset() {
+ m.val = m.max
}
diff --git a/pkg/query/logical/measure/measure_plan_aggregation.go b/pkg/query/logical/measure/measure_plan_aggregation.go
index 9c35adc..f9fa5cf 100644
--- a/pkg/query/logical/measure/measure_plan_aggregation.go
+++ b/pkg/query/logical/measure/measure_plan_aggregation.go
@@ -21,7 +21,9 @@ import (
"fmt"
"github.com/pkg/errors"
+ "go.uber.org/multierr"
+ databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/query/aggregation"
@@ -31,7 +33,8 @@ import (
var (
_ logical.UnresolvedPlan = (*unresolvedAggregation)(nil)
- _ logical.Plan = (*aggregationPlan)(nil)
+
+ errUnsupportedAggregationField = errors.New("unsupported aggregation operation on this field")
)
type unresolvedAggregation struct {
@@ -63,47 +66,61 @@ func (gba *unresolvedAggregation) Analyze(measureSchema logical.Schema) (logical
if len(aggregationFieldRefs) == 0 {
return nil, errors.Wrap(errFieldNotDefined, "aggregation schema")
}
- aggrFunc, err := aggregation.NewInt64Func(gba.aggrFunc)
+ fieldRef := aggregationFieldRefs[0]
+ switch fieldRef.Spec.Spec.FieldType {
+ case databasev1.FieldType_FIELD_TYPE_INT:
+ return newAggregationPlan[int64](gba, prevPlan, measureSchema, fieldRef)
+ case databasev1.FieldType_FIELD_TYPE_FLOAT:
+ return newAggregationPlan[float64](gba, prevPlan, measureSchema, fieldRef)
+ default:
+ return nil, errors.WithMessagef(errUnsupportedAggregationField, "field: %s", fieldRef.Spec.Spec)
+ }
+}
+
+type aggregationPlan[N aggregation.Number] struct {
+ *logical.Parent
+ schema logical.Schema
+ aggregationFieldRef *logical.FieldRef
+ aggrFunc aggregation.Func[N]
+ aggrType modelv1.AggregationFunction
+ isGroup bool
+}
+
+func newAggregationPlan[N aggregation.Number](gba *unresolvedAggregation, prevPlan logical.Plan,
+ measureSchema logical.Schema, fieldRef *logical.FieldRef,
+) (*aggregationPlan[N], error) {
+ aggrFunc, err := aggregation.NewFunc[N](gba.aggrFunc)
if err != nil {
return nil, err
}
- return &aggregationPlan{
+ return &aggregationPlan[N]{
Parent: &logical.Parent{
UnresolvedInput: gba.unresolvedInput,
Input: prevPlan,
},
schema: measureSchema,
aggrFunc: aggrFunc,
- aggregationFieldRef: aggregationFieldRefs[0],
+ aggregationFieldRef: fieldRef,
isGroup: gba.isGroup,
}, nil
}
-type aggregationPlan struct {
- *logical.Parent
- schema logical.Schema
- aggregationFieldRef *logical.FieldRef
- aggrFunc aggregation.Int64Func
- aggrType modelv1.AggregationFunction
- isGroup bool
-}
-
-func (g *aggregationPlan) String() string {
+func (g *aggregationPlan[N]) String() string {
return fmt.Sprintf("%s aggregation: aggregation{type=%d,field=%s}",
g.Input,
g.aggrType,
g.aggregationFieldRef.Field.Name)
}
-func (g *aggregationPlan) Children() []logical.Plan {
+func (g *aggregationPlan[N]) Children() []logical.Plan {
return []logical.Plan{g.Input}
}
-func (g *aggregationPlan) Schema() logical.Schema {
+func (g *aggregationPlan[N]) Schema() logical.Schema {
return g.schema.ProjFields(g.aggregationFieldRef)
}
-func (g *aggregationPlan) Execute(ec executor.MeasureExecutionContext) (executor.MIterator, error) {
+func (g *aggregationPlan[N]) Execute(ec executor.MeasureExecutionContext) (executor.MIterator, error) {
iter, err := g.Parent.Input.(executor.MeasureExecutable).Execute(ec)
if err != nil {
return nil, err
@@ -114,38 +131,49 @@ func (g *aggregationPlan) Execute(ec executor.MeasureExecutionContext) (executor
return newAggAllIterator(iter, g.aggregationFieldRef, g.aggrFunc), nil
}
-type aggGroupIterator struct {
+type aggGroupIterator[N aggregation.Number] struct {
prev executor.MIterator
aggregationFieldRef *logical.FieldRef
- aggrFunc aggregation.Int64Func
+ aggrFunc aggregation.Func[N]
+
+ err error
}
-func newAggGroupMIterator(
+func newAggGroupMIterator[N aggregation.Number](
prev executor.MIterator,
aggregationFieldRef *logical.FieldRef,
- aggrFunc aggregation.Int64Func,
+ aggrFunc aggregation.Func[N],
) executor.MIterator {
- return &aggGroupIterator{
+ return &aggGroupIterator[N]{
prev: prev,
aggregationFieldRef: aggregationFieldRef,
aggrFunc: aggrFunc,
}
}
-func (ami *aggGroupIterator) Next() bool {
+func (ami *aggGroupIterator[N]) Next() bool {
+ if ami.err != nil {
+ return false
+ }
return ami.prev.Next()
}
-func (ami *aggGroupIterator) Current() []*measurev1.DataPoint {
+func (ami *aggGroupIterator[N]) Current() []*measurev1.DataPoint {
+ if ami.err != nil {
+ return nil
+ }
ami.aggrFunc.Reset()
group := ami.prev.Current()
var resultDp *measurev1.DataPoint
for _, dp := range group {
value := dp.GetFields()[ami.aggregationFieldRef.Spec.FieldIdx].
- GetValue().
- GetInt().
GetValue()
- ami.aggrFunc.In(value)
+ v, err := aggregation.FromFieldValue[N](value)
+ if err != nil {
+ ami.err = err
+ return nil
+ }
+ ami.aggrFunc.In(v)
if resultDp != nil {
continue
}
@@ -156,47 +184,47 @@ func (ami *aggGroupIterator) Current() []*measurev1.DataPoint {
if resultDp == nil {
return nil
}
+ val, err := aggregation.ToFieldValue(ami.aggrFunc.Val())
+ if err != nil {
+ ami.err = err
+ return nil
+ }
resultDp.Fields = []*measurev1.DataPoint_Field{
{
- Name: ami.aggregationFieldRef.Field.Name,
- Value: &modelv1.FieldValue{
- Value: &modelv1.FieldValue_Int{
- Int: &modelv1.Int{
- Value: ami.aggrFunc.Val(),
- },
- },
- },
+ Name: ami.aggregationFieldRef.Field.Name,
+ Value: val,
},
}
return []*measurev1.DataPoint{resultDp}
}
-func (ami *aggGroupIterator) Close() error {
- return ami.prev.Close()
+func (ami *aggGroupIterator[N]) Close() error {
+ return multierr.Combine(ami.err, ami.prev.Close())
}
-type aggAllIterator struct {
+type aggAllIterator[N aggregation.Number] struct {
prev executor.MIterator
aggregationFieldRef *logical.FieldRef
- aggrFunc aggregation.Int64Func
+ aggrFunc aggregation.Func[N]
result *measurev1.DataPoint
+ err error
}
-func newAggAllIterator(
+func newAggAllIterator[N aggregation.Number](
prev executor.MIterator,
aggregationFieldRef *logical.FieldRef,
- aggrFunc aggregation.Int64Func,
+ aggrFunc aggregation.Func[N],
) executor.MIterator {
- return &aggAllIterator{
+ return &aggAllIterator[N]{
prev: prev,
aggregationFieldRef: aggregationFieldRef,
aggrFunc: aggrFunc,
}
}
-func (ami *aggAllIterator) Next() bool {
- if ami.result != nil {
+func (ami *aggAllIterator[N]) Next() bool {
+ if ami.result != nil || ami.err != nil {
return false
}
defer ami.prev.Close()
@@ -205,10 +233,13 @@ func (ami *aggAllIterator) Next() bool {
group := ami.prev.Current()
for _, dp := range group {
value := dp.GetFields()[ami.aggregationFieldRef.Spec.FieldIdx].
- GetValue().
- GetInt().
GetValue()
- ami.aggrFunc.In(value)
+ v, err := aggregation.FromFieldValue[N](value)
+ if err != nil {
+ ami.err = err
+ return false
+ }
+ ami.aggrFunc.In(v)
if resultDp != nil {
continue
}
@@ -220,29 +251,28 @@ func (ami *aggAllIterator) Next() bool {
if resultDp == nil {
return false
}
+ val, err := aggregation.ToFieldValue(ami.aggrFunc.Val())
+ if err != nil {
+ ami.err = err
+ return false
+ }
resultDp.Fields = []*measurev1.DataPoint_Field{
{
- Name: ami.aggregationFieldRef.Field.Name,
- Value: &modelv1.FieldValue{
- Value: &modelv1.FieldValue_Int{
- Int: &modelv1.Int{
- Value: ami.aggrFunc.Val(),
- },
- },
- },
+ Name: ami.aggregationFieldRef.Field.Name,
+ Value: val,
},
}
ami.result = resultDp
return true
}
-func (ami *aggAllIterator) Current() []*measurev1.DataPoint {
+func (ami *aggAllIterator[N]) Current() []*measurev1.DataPoint {
if ami.result == nil {
return nil
}
return []*measurev1.DataPoint{ami.result}
}
-func (ami *aggAllIterator) Close() error {
+func (ami *aggAllIterator[N]) Close() error {
return nil
}
diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go
index e217103..c2f6a90 100644
--- a/pkg/schema/metadata.go
+++ b/pkg/schema/metadata.go
@@ -25,6 +25,7 @@ import (
"sync/atomic"
"time"
+ "github.com/onsi/ginkgo/v2"
"github.com/pkg/errors"
"go.uber.org/multierr"
"google.golang.org/protobuf/types/known/timestamppb"
@@ -520,6 +521,7 @@ func (g *group) LoadResource(name string) (Resource, bool) {
}
func (g *group) notify(resource Resource, action databasev1.Action) error {
+ defer ginkgo.GinkgoRecover()
now := time.Now()
nowPb := timestamppb.New(now)
entityLocator := resource.EntityLocator()
diff --git a/pkg/test/measure/testdata/measures/instance_clr_cpu_minute.json b/pkg/test/measure/testdata/measures/instance_clr_cpu_minute.json
new file mode 100644
index 0000000..121770f
--- /dev/null
+++ b/pkg/test/measure/testdata/measures/instance_clr_cpu_minute.json
@@ -0,0 +1,50 @@
+{
+ "entity": {
+ "tagNames": [
+ "entity_id"
+ ]
+ },
+ "fields": [
+ {
+ "compressionMethod": "COMPRESSION_METHOD_ZSTD",
+ "encodingMethod": "ENCODING_METHOD_GORILLA",
+ "fieldType": "FIELD_TYPE_FLOAT",
+ "name": "summation"
+ },
+ {
+ "compressionMethod": "COMPRESSION_METHOD_ZSTD",
+ "encodingMethod": "ENCODING_METHOD_GORILLA",
+ "fieldType": "FIELD_TYPE_INT",
+ "name": "count"
+ },
+ {
+ "compressionMethod": "COMPRESSION_METHOD_ZSTD",
+ "encodingMethod": "ENCODING_METHOD_GORILLA",
+ "fieldType": "FIELD_TYPE_FLOAT",
+ "name": "value"
+ }
+ ],
+ "interval": "1m",
+ "metadata": {
+ "group": "sw_metric",
+ "name": "instance_clr_cpu_minute"
+ },
+ "tagFamilies": [
+ {
+ "name": "default",
+ "tags": [
+ {
+ "indexedOnly": false,
+ "name": "service_id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "indexedOnly": false,
+ "name": "entity_id",
+ "type": "TAG_TYPE_STRING"
+ }
+ ]
+ }
+ ],
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/test/cases/measure/data/input/float.yaml b/test/cases/measure/data/input/float.yaml
new file mode 100644
index 0000000..73e71f1
--- /dev/null
+++ b/test/cases/measure/data/input/float.yaml
@@ -0,0 +1,26 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+metadata:
+ name: "instance_clr_cpu_minute"
+ group: "sw_metric"
+tagProjection:
+ tagFamilies:
+ - name: "default"
+ tags: ["service_id", "entity_id"]
+fieldProjection:
+ names: ["summation", "count", "value"]
diff --git a/test/cases/measure/data/input/float_agg_min.yaml b/test/cases/measure/data/input/float_agg_min.yaml
new file mode 100644
index 0000000..5762a01
--- /dev/null
+++ b/test/cases/measure/data/input/float_agg_min.yaml
@@ -0,0 +1,29 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+metadata:
+ name: "instance_clr_cpu_minute"
+ group: "sw_metric"
+tagProjection:
+ tagFamilies:
+ - name: "default"
+ tags: ["service_id", "entity_id"]
+fieldProjection:
+ names: ["summation", "count", "value"]
+agg:
+ function: "AGGREGATION_FUNCTION_MIN"
+ fieldName: "value"
\ No newline at end of file
diff --git a/test/cases/measure/data/testdata/instance_clr_cpu_minute_data.json b/test/cases/measure/data/testdata/instance_clr_cpu_minute_data.json
new file mode 100644
index 0000000..e767391
--- /dev/null
+++ b/test/cases/measure/data/testdata/instance_clr_cpu_minute_data.json
@@ -0,0 +1,107 @@
+[
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "1"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_1"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "float": {
+ "value": 18.1
+ }
+ },
+ {
+ "int": {
+ "value": 1
+ }
+ },
+ {
+ "float": {
+ "value": 18.1
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "4"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_2"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "float": {
+ "value": 100.4
+ }
+ },
+ {
+ "int": {
+ "value": 2
+ }
+ },
+ {
+ "float": {
+ "value": 50.2
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "5"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_2"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "float": {
+ "value": 100.0
+ }
+ },
+ {
+ "int": {
+ "value": 3
+ }
+ },
+ {
+ "float": {
+ "value": 33.333
+ }
+ }
+ ]
+ }
+]
diff --git a/test/cases/measure/data/want/float.yaml b/test/cases/measure/data/want/float.yaml
new file mode 100644
index 0000000..a0af118
--- /dev/null
+++ b/test/cases/measure/data/want/float.yaml
@@ -0,0 +1,93 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+dataPoints:
+- fields:
+ - name: summation
+ value:
+ float:
+ value: 18.1
+ - name: count
+ value:
+ int:
+ value: "1"
+ - name: value
+ value:
+ float:
+ value: 18.1
+ tagFamilies:
+ - name: default
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: "1"
+ - key: entity_id
+ value:
+ str:
+ value: entity_1
+ timestamp: "2023-01-10T14:14:00Z"
+- fields:
+ - name: summation
+ value:
+ float:
+ value: 100.4
+ - name: count
+ value:
+ int:
+ value: "2"
+ - name: value
+ value:
+ float:
+ value: 50.2
+ tagFamilies:
+ - name: default
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: "4"
+ - key: entity_id
+ value:
+ str:
+ value: entity_2
+ timestamp: "2023-01-10T14:15:00Z"
+- fields:
+ - name: summation
+ value:
+ float:
+ value: 100
+ - name: count
+ value:
+ int:
+ value: "3"
+ - name: value
+ value:
+ float:
+ value: 33.333
+ tagFamilies:
+ - name: default
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: "5"
+ - key: entity_id
+ value:
+ str:
+ value: entity_2
+ timestamp: "2023-01-10T14:16:00Z"
\ No newline at end of file
diff --git a/test/cases/measure/data/want/float_agg_min.yaml b/test/cases/measure/data/want/float_agg_min.yaml
new file mode 100644
index 0000000..63ae130
--- /dev/null
+++ b/test/cases/measure/data/want/float_agg_min.yaml
@@ -0,0 +1,34 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+dataPoints:
+- fields:
+ - name: value
+ value:
+ float:
+ value: 18.1
+ tagFamilies:
+ - name: default
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: "1"
+ - key: entity_id
+ value:
+ str:
+ value: entity_1
\ No newline at end of file
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index dfc5cee..4db0b05 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -61,4 +61,6 @@ var _ = g.DescribeTable("Scanning Measures", verify,
g.Entry("invalid logical expression", helpers.Args{Input: "err_invalid_le", Duration: 25 * time.Minute, Offset: -20 * time.Minute, WantErr: true}),
g.Entry("linked or expressions", helpers.Args{Input: "linked_or", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("In and not In expressions", helpers.Args{Input: "in", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+ g.Entry("float64 value", helpers.Args{Input: "float", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+ g.Entry("float64 aggregation:min", helpers.Args{Input: "float_agg_min", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
)
diff --git a/test/integration/cold_query/query_suite_test.go b/test/integration/cold_query/query_suite_test.go
index 4c2e9f3..46f30c5 100644
--- a/test/integration/cold_query/query_suite_test.go
+++ b/test/integration/cold_query/query_suite_test.go
@@ -32,11 +32,11 @@ import (
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
- casesMeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
- casesMeasureData "github.com/apache/skywalking-banyandb/test/cases/measure/data"
- casesStream "github.com/apache/skywalking-banyandb/test/cases/stream"
- casesStreamData "github.com/apache/skywalking-banyandb/test/cases/stream/data"
- casesTopn "github.com/apache/skywalking-banyandb/test/cases/topn"
+ casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
+ casesmeasureData "github.com/apache/skywalking-banyandb/test/cases/measure/data"
+ casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
+ casesstreamdata "github.com/apache/skywalking-banyandb/test/cases/stream/data"
+ casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
)
func TestIntegrationColdQuery(t *testing.T) {
@@ -65,27 +65,28 @@ var _ = SynchronizedBeforeSuite(func() []byte {
ns := timestamp.NowMilli().UnixNano()
now = time.Unix(0, ns-ns%int64(time.Minute)).Add(-time.Hour * 24)
interval := 500 * time.Millisecond
- casesStreamData.Write(conn, "data.json", now, interval)
+ casesstreamdata.Write(conn, "data.json", now, interval)
interval = time.Minute
- casesMeasureData.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval)
- casesMeasureData.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
- casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
- casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(10*time.Second), interval)
- casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
+ casesmeasureData.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval)
+ casesmeasureData.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
+ casesmeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
+ casesmeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(10*time.Second), interval)
+ casesmeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
+ casesmeasureData.Write(conn, "instance_clr_cpu_minute", "sw_metric", "instance_clr_cpu_minute_data.json", now, interval)
Expect(conn.Close()).To(Succeed())
return []byte(addr)
}, func(address []byte) {
var err error
connection, err = grpchelper.Conn(string(address), 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials()))
- casesStream.SharedContext = helpers.SharedContext{
+ casesstream.SharedContext = helpers.SharedContext{
Connection: connection,
BaseTime: now,
}
- casesMeasure.SharedContext = helpers.SharedContext{
+ casesmeasure.SharedContext = helpers.SharedContext{
Connection: connection,
BaseTime: now,
}
- casesTopn.SharedContext = helpers.SharedContext{
+ casestopn.SharedContext = helpers.SharedContext{
Connection: connection,
BaseTime: now,
}
diff --git a/test/integration/query/query_suite_test.go b/test/integration/query/query_suite_test.go
index 5861a58..4b820e7 100644
--- a/test/integration/query/query_suite_test.go
+++ b/test/integration/query/query_suite_test.go
@@ -32,11 +32,11 @@ import (
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
- cases_measure "github.com/apache/skywalking-banyandb/test/cases/measure"
- cases_measure_data "github.com/apache/skywalking-banyandb/test/cases/measure/data"
- cases_stream "github.com/apache/skywalking-banyandb/test/cases/stream"
- cases_stream_data "github.com/apache/skywalking-banyandb/test/cases/stream/data"
- cases_topn "github.com/apache/skywalking-banyandb/test/cases/topn"
+ casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
+ casesmeasuredata "github.com/apache/skywalking-banyandb/test/cases/measure/data"
+ casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
+ casesstreamdata "github.com/apache/skywalking-banyandb/test/cases/stream/data"
+ casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
)
func TestIntegrationQuery(t *testing.T) {
@@ -65,29 +65,30 @@ var _ = SynchronizedBeforeSuite(func() []byte {
now = time.Unix(0, ns-ns%int64(time.Minute))
interval := 500 * time.Millisecond
// stream
- cases_stream_data.Write(conn, "data.json", now, interval)
+ casesstreamdata.Write(conn, "data.json", now, interval)
// measure
interval = time.Minute
- cases_measure_data.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval)
- cases_measure_data.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
- cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
- cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(10*time.Second), interval)
- cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
+ casesmeasuredata.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval)
+ casesmeasuredata.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
+ casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
+ casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(10*time.Second), interval)
+ casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
+ casesmeasuredata.Write(conn, "instance_clr_cpu_minute", "sw_metric", "instance_clr_cpu_minute_data.json", now, interval)
Expect(conn.Close()).To(Succeed())
return []byte(addr)
}, func(address []byte) {
var err error
connection, err = grpchelper.Conn(string(address), 10*time.Second,
grpc.WithTransportCredentials(insecure.NewCredentials()))
- cases_stream.SharedContext = helpers.SharedContext{
+ casesstream.SharedContext = helpers.SharedContext{
Connection: connection,
BaseTime: now,
}
- cases_measure.SharedContext = helpers.SharedContext{
+ casesmeasure.SharedContext = helpers.SharedContext{
Connection: connection,
BaseTime: now,
}
- cases_topn.SharedContext = helpers.SharedContext{
+ casestopn.SharedContext = helpers.SharedContext{
Connection: connection,
BaseTime: now,
}