You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/04/26 08:19:10 UTC
[skywalking-banyandb] 01/01: Introduce gorilla encoder to measure
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch patch-path
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 7b32b8894ec7d2d6f4bdbe69776d98210ca7fc84
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Apr 26 08:11:42 2022 +0000
Introduce gorilla encoder to measure
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
.gitignore | 3 +
api/proto/banyandb/database/v1/schema.pb.go | 1 +
api/proto/banyandb/database/v1/schema.proto | 1 +
banyand/liaison/grpc/measure_test.go | 17 +--
banyand/liaison/grpc/stream_test.go | 15 ++-
banyand/measure/encode.go | 123 +++++++++++++++++++++
banyand/measure/field_flag_test.go | 39 +++++++
banyand/measure/measure.go | 13 ++-
banyand/measure/measure_query.go | 4 +-
banyand/measure/measure_write.go | 19 +---
banyand/measure/metadata.go | 5 +-
banyand/stream/stream_query.go | 2 +-
banyand/stream/stream_write.go | 2 +-
banyand/tsdb/series_seek.go | 6 +-
banyand/tsdb/series_write.go | 2 +-
banyand/tsdb/seriesdb.go | 10 +-
banyand/tsdb/seriesdb_test.go | 110 +++++++++---------
pkg/bit/writer.go | 10 +-
pkg/encoding/int.go | 117 ++++++++++++++++----
pkg/encoding/int_test.go | 123 +++++++++++++++++++++
pkg/encoding/plain.go | 34 +++---
.../measure/testdata/measures/service_cpm_day.json | 2 +-
.../measures/service_instance_cpm_day.json | 2 +-
pkg/timestamp/duration.go | 53 +++++++++
pkg/timestamp/duration_test.go | 60 ++++++++++
25 files changed, 636 insertions(+), 137 deletions(-)
diff --git a/.gitignore b/.gitignore
index 1b3617e..2e3a8ba 100644
--- a/.gitignore
+++ b/.gitignore
@@ -46,3 +46,6 @@ build/release/
# mock files
*mock.go
*mock_test.go
+
+# etcd unix sockets
+localhost:*
diff --git a/api/proto/banyandb/database/v1/schema.pb.go b/api/proto/banyandb/database/v1/schema.pb.go
index e9b503a..b49fbef 100644
--- a/api/proto/banyandb/database/v1/schema.pb.go
+++ b/api/proto/banyandb/database/v1/schema.pb.go
@@ -671,6 +671,7 @@ type Measure struct {
// entity indicates which tags will be to generate a series and shard a measure
Entity *Entity `protobuf:"bytes,4,opt,name=entity,proto3" json:"entity,omitempty"`
// interval indicates how frequently to send a data point
+ // valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h", "d".
Interval string `protobuf:"bytes,5,opt,name=interval,proto3" json:"interval,omitempty"`
// updated_at indicates when the measure is updated
UpdatedAt *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=updated_at,json=updatedAt,proto3" json:"updated_at,omitempty"`
diff --git a/api/proto/banyandb/database/v1/schema.proto b/api/proto/banyandb/database/v1/schema.proto
index 41df79d..86b9cad 100644
--- a/api/proto/banyandb/database/v1/schema.proto
+++ b/api/proto/banyandb/database/v1/schema.proto
@@ -103,6 +103,7 @@ message Measure {
// entity indicates which tags will be to generate a series and shard a measure
Entity entity = 4;
// interval indicates how frequently to send a data point
+ // valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h", "d".
string interval = 5;
// updated_at indicates when the measure is updated
google.protobuf.Timestamp updated_at = 6;
diff --git a/banyand/liaison/grpc/measure_test.go b/banyand/liaison/grpc/measure_test.go
index 27fd27d..22289cc 100644
--- a/banyand/liaison/grpc/measure_test.go
+++ b/banyand/liaison/grpc/measure_test.go
@@ -38,19 +38,21 @@ import (
)
var _ = Describe("Measure", func() {
- var rootPath, metadataPath string
- var gracefulStop, deferRootFunc, deferMetadataFunc func()
+ var streamPath, measurePath, metadataPath string
+ var gracefulStop, deferStreamFunc, deferMeasureFunc, deferMetadataFunc func()
var conn *grpclib.ClientConn
BeforeEach(func() {
var err error
- rootPath, deferRootFunc, err = test.NewSpace()
+ streamPath, deferStreamFunc, err = test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+ measurePath, deferMeasureFunc, err = test.NewSpace()
Expect(err).NotTo(HaveOccurred())
metadataPath, deferMetadataFunc, err = test.NewSpace()
Expect(err).NotTo(HaveOccurred())
})
- It("is a plain server", func() {
+ FIt("is a plain server", func() {
By("Verifying an empty server")
- flags := []string{"--measure-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
+ flags := []string{"--stream-root-path=" + streamPath, "--measure-root-path=" + measurePath, "--metadata-root-path=" + metadataPath}
gracefulStop = setup(flags)
var err error
conn, err = grpclib.Dial("localhost:17912", grpclib.WithInsecure())
@@ -75,7 +77,7 @@ var _ = Describe("Measure", func() {
}, defaultEventallyTimeout).Should(Equal(1))
})
It("is a TLS server", func() {
- flags := []string{"--tls=true", "--measure-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
+ flags := []string{"--tls=true", "--stream-root-path=" + streamPath, "--measure-root-path=" + measurePath, "--metadata-root-path=" + metadataPath}
_, currentFile, _, _ := runtime.Caller(0)
basePath := filepath.Dir(currentFile)
certFile := filepath.Join(basePath, "testdata/server_cert.pem")
@@ -98,7 +100,8 @@ var _ = Describe("Measure", func() {
_ = conn.Close()
gracefulStop()
deferMetadataFunc()
- deferRootFunc()
+ deferStreamFunc()
+ deferMeasureFunc()
})
})
diff --git a/banyand/liaison/grpc/stream_test.go b/banyand/liaison/grpc/stream_test.go
index 8f02215..5e52e1b 100644
--- a/banyand/liaison/grpc/stream_test.go
+++ b/banyand/liaison/grpc/stream_test.go
@@ -45,19 +45,21 @@ import (
)
var _ = Describe("Stream", func() {
- var rootPath, metadataPath string
- var gracefulStop, deferRootFunc, deferMetadataFunc func()
+ var streamPath, measurePath, metadataPath string
+ var gracefulStop, deferStreamFunc, deferMeasureFunc, deferMetadataFunc func()
var conn *grpclib.ClientConn
BeforeEach(func() {
var err error
- rootPath, deferRootFunc, err = test.NewSpace()
+ streamPath, deferStreamFunc, err = test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+ measurePath, deferMeasureFunc, err = test.NewSpace()
Expect(err).NotTo(HaveOccurred())
metadataPath, deferMetadataFunc, err = test.NewSpace()
Expect(err).NotTo(HaveOccurred())
})
It("is a plain server", func() {
By("Verifying an empty server")
- flags := []string{"--stream-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
+ flags := []string{"--stream-root-path=" + streamPath, "--measure-root-path=" + measurePath, "--metadata-root-path=" + metadataPath}
gracefulStop = setup(flags)
var err error
conn, err = grpclib.Dial("localhost:17912", grpclib.WithInsecure())
@@ -82,7 +84,7 @@ var _ = Describe("Stream", func() {
}, defaultEventallyTimeout).Should(Equal(1))
})
It("is a TLS server", func() {
- flags := []string{"--tls=true", "--stream-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
+ flags := []string{"--tls=true", "--stream-root-path=" + streamPath, "--measure-root-path=" + measurePath, "--metadata-root-path=" + metadataPath}
_, currentFile, _, _ := runtime.Caller(0)
basePath := filepath.Dir(currentFile)
certFile := filepath.Join(basePath, "testdata/server_cert.pem")
@@ -105,7 +107,8 @@ var _ = Describe("Stream", func() {
_ = conn.Close()
gracefulStop()
deferMetadataFunc()
- deferRootFunc()
+ deferStreamFunc()
+ deferMeasureFunc()
})
})
diff --git a/banyand/measure/encode.go b/banyand/measure/encode.go
new file mode 100644
index 0000000..3d6fcf3
--- /dev/null
+++ b/banyand/measure/encode.go
@@ -0,0 +1,123 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package measure
+
+import (
+ "time"
+
+ databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+var (
+ _ encoding.SeriesEncoderPool = (*encoderPool)(nil)
+ _ encoding.SeriesDecoderPool = (*decoderPool)(nil)
+ intervalFn = func(key []byte) time.Duration {
+ _, interval, err := decodeFieldFlag(key)
+ if err != nil {
+ panic(err)
+ }
+ return interval
+ }
+)
+
+type encoderPool struct {
+ intPool encoding.SeriesEncoderPool
+ defaultPool encoding.SeriesEncoderPool
+ l *logger.Logger
+}
+
+func newEncoderPool(size int, l *logger.Logger) encoding.SeriesEncoderPool {
+ return &encoderPool{
+ intPool: encoding.NewIntEncoderPool(size, intervalFn),
+ defaultPool: encoding.NewPlainEncoderPool(size),
+ l: l,
+ }
+}
+
+func (p *encoderPool) Get(metadata []byte) encoding.SeriesEncoder {
+ fieldSpec, _, err := decodeFieldFlag(metadata)
+ if err != nil {
+ p.l.Err(err).Msg("failed to decode field flag")
+ return p.defaultPool.Get(metadata)
+ }
+ if fieldSpec.EncodingMethod == databasev1.EncodingMethod_ENCODING_METHOD_GORILLA {
+ return p.intPool.Get(metadata)
+ }
+ return p.defaultPool.Get(metadata)
+}
+
+func (p *encoderPool) Put(encoder encoding.SeriesEncoder) {
+ p.intPool.Put(encoder)
+ p.defaultPool.Put(encoder)
+}
+
+type decoderPool struct {
+ intPool encoding.SeriesDecoderPool
+ defaultPool encoding.SeriesDecoderPool
+ l *logger.Logger
+}
+
+func newDecoderPool(size int, l *logger.Logger) encoding.SeriesDecoderPool {
+ return &decoderPool{
+ intPool: encoding.NewIntDecoderPool(size, intervalFn),
+ defaultPool: encoding.NewPlainDecoderPool(size),
+ l: l,
+ }
+}
+
+func (p *decoderPool) Get(metadata []byte) encoding.SeriesDecoder {
+ fieldSpec, _, err := decodeFieldFlag(metadata)
+ if err != nil {
+ p.l.Err(err).Msg("failed to decode field flag")
+ return p.defaultPool.Get(metadata)
+ }
+ if fieldSpec.EncodingMethod == databasev1.EncodingMethod_ENCODING_METHOD_GORILLA {
+ return p.intPool.Get(metadata)
+ }
+ return p.defaultPool.Get(metadata)
+}
+
+func (p *decoderPool) Put(decoder encoding.SeriesDecoder) {
+ p.intPool.Put(decoder)
+ p.defaultPool.Put(decoder)
+}
+
+const fieldFlagLength = 9
+
+func encoderFieldFlag(fieldSpec *databasev1.FieldSpec, interval time.Duration) []byte {
+ encodingMethod := byte(fieldSpec.GetEncodingMethod().Number())
+ compressionMethod := byte(fieldSpec.GetCompressionMethod().Number())
+ bb := make([]byte, fieldFlagLength)
+ bb[0] = encodingMethod<<4 | compressionMethod
+ copy(bb[1:], convert.Int64ToBytes(int64(interval)))
+ return bb
+}
+
+func decodeFieldFlag(key []byte) (*databasev1.FieldSpec, time.Duration, error) {
+ if len(key) < fieldFlagLength {
+ return nil, 0, ErrMalformedFieldFlag
+ }
+ b := key[len(key)-9:]
+ return &databasev1.FieldSpec{
+ EncodingMethod: databasev1.EncodingMethod(int32(b[0]) >> 4),
+ CompressionMethod: databasev1.CompressionMethod((int32(b[0] & 0x0F))),
+ }, time.Duration(convert.BytesToInt64(b[1:])), nil
+}
diff --git a/banyand/measure/field_flag_test.go b/banyand/measure/field_flag_test.go
new file mode 100644
index 0000000..baec5b8
--- /dev/null
+++ b/banyand/measure/field_flag_test.go
@@ -0,0 +1,39 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package measure
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+
+ databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+func TestEncodeFieldFlag(t *testing.T) {
+ flag := encoderFieldFlag(&databasev1.FieldSpec{
+ EncodingMethod: databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
+ CompressionMethod: databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
+ }, time.Minute)
+ fieldSpec, interval, err := decodeFieldFlag(flag)
+ assert.NoError(t, err)
+ assert.Equal(t, databasev1.EncodingMethod_ENCODING_METHOD_GORILLA, fieldSpec.EncodingMethod)
+ assert.Equal(t, databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD, fieldSpec.CompressionMethod)
+ assert.Equal(t, time.Minute, interval)
+}
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 481197d..d9688c9 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -19,6 +19,7 @@ package measure
import (
"context"
+ "time"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -27,6 +28,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
// a chunk is 1MB
@@ -44,6 +46,7 @@ type measure struct {
entityLocator partition.EntityLocator
indexRules []*databasev1.IndexRule
indexWriter *index.Writer
+ interval time.Duration
}
func (s *measure) GetSchema() *databasev1.Measure {
@@ -70,10 +73,14 @@ func (s *measure) Close() error {
return s.indexWriter.Close()
}
-func (s *measure) parseSpec() {
+func (s *measure) parseSpec() (err error) {
s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup()
s.entityLocator = partition.NewEntityLocator(s.schema.GetTagFamilies(), s.schema.GetEntity())
s.maxObservedModRevision = pbv1.ParseMaxModRevision(s.indexRules)
+ if s.schema.Interval != "" {
+ s.interval, err = timestamp.ParseDuration(s.schema.Interval)
+ }
+ return err
}
type measureSpec struct {
@@ -88,7 +95,9 @@ func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l *logger.
indexRules: spec.indexRules,
l: l,
}
- sm.parseSpec()
+ if err := sm.parseSpec(); err != nil {
+ return nil, err
+ }
ctx := context.WithValue(context.Background(), logger.ContextKey, l)
sm.db = db
diff --git a/banyand/measure/measure_query.go b/banyand/measure/measure_query.go
index 4d482b2..2d9473f 100644
--- a/banyand/measure/measure_query.go
+++ b/banyand/measure/measure_query.go
@@ -91,7 +91,7 @@ func (s *measure) Shard(id common.ShardID) (tsdb.Shard, error) {
}
func (s *measure) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) {
- familyRawBytes, err := item.Family(string(familyIdentity(family, TagFlag)))
+ familyRawBytes, err := item.Family(familyIdentity(family, TagFlag))
if err != nil {
return nil, err
}
@@ -132,7 +132,7 @@ func (s *measure) ParseField(name string, item tsdb.Item) (*measurev1.DataPoint_
break
}
}
- bytes, err := item.Family(string(familyIdentity(name, encoderFieldFlag(fieldSpec))))
+ bytes, err := item.Family(familyIdentity(name, encoderFieldFlag(fieldSpec, s.interval)))
if err != nil {
return nil, err
}
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index d5f3db8..efb5ac5 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -37,11 +37,10 @@ import (
)
var (
- ErrMalformedElement = errors.New("element is malformed")
-)
+ ErrMalformedElement = errors.New("element is malformed")
+ ErrMalformedFieldFlag = errors.New("field flag is malformed")
-const (
- TagFlag byte = iota
+ TagFlag []byte = make([]byte, fieldFlagLength)
)
func (s *measure) Write(value *measurev1.DataPointValue) error {
@@ -129,7 +128,7 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea
if data == nil {
continue
}
- builder.Family(familyIdentity(sm.GetFields()[fi].GetName(), encoderFieldFlag(fieldSpec)), data)
+ builder.Family(familyIdentity(sm.GetFields()[fi].GetName(), encoderFieldFlag(fieldSpec, s.interval)), data)
}
writer, errWrite := builder.Build()
if errWrite != nil {
@@ -196,8 +195,8 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
return
}
-func familyIdentity(name string, flag byte) []byte {
- return bytes.Join([][]byte{[]byte(name), {flag}}, nil)
+func familyIdentity(name string, flag []byte) []byte {
+ return bytes.Join([][]byte{tsdb.Hash([]byte(name)), flag}, nil)
}
func encodeFieldValue(fieldValue *modelv1.FieldValue) []byte {
@@ -223,9 +222,3 @@ func decodeFieldValue(fieldValue []byte, fieldSpec *databasev1.FieldSpec) *model
}
return &modelv1.FieldValue{Value: &modelv1.FieldValue_Null{}}
}
-
-func encoderFieldFlag(fieldSpec *databasev1.FieldSpec) byte {
- encodingMethod := byte(fieldSpec.GetEncodingMethod().Number())
- compressionMethod := byte(fieldSpec.GetCompressionMethod().Number())
- return encodingMethod<<4 | compressionMethod
-}
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 0499083..14fae45 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -30,7 +30,6 @@ import (
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
- "github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
)
@@ -205,8 +204,8 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
Location: path.Join(s.path, groupSchema.Metadata.Name),
ShardNum: groupSchema.ResourceOpts.ShardNum,
EncodingMethod: tsdb.EncodingMethod{
- EncoderPool: encoding.NewPlainEncoderPool(chunkSize),
- DecoderPool: encoding.NewPlainDecoderPool(chunkSize),
+ EncoderPool: newEncoderPool(chunkSize, s.l),
+ DecoderPool: newDecoderPool(chunkSize, s.l),
},
})
}
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index 493966f..97640e8 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -88,7 +88,7 @@ func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error) {
}
func (s *stream) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) {
- familyRawBytes, err := item.Family(family)
+ familyRawBytes, err := item.Family(tsdb.Hash([]byte(family)))
if err != nil {
return nil, errors.Wrapf(err, "parse family %s", family)
}
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index d73130e..c8d7ac7 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -102,7 +102,7 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *stre
if errMarshal != nil {
return nil, errMarshal
}
- builder.Family([]byte(sm.GetTagFamilies()[fi].GetName()), bb)
+ builder.Family(tsdb.Hash([]byte(sm.GetTagFamilies()[fi].GetName())), bb)
}
builder.Val([]byte(value.GetElementId()))
writer, errWrite := builder.Build()
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index eec6104..9051ed4 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -32,7 +32,7 @@ type Iterator interface {
}
type Item interface {
- Family(family string) ([]byte, error)
+ Family(family []byte) ([]byte, error)
Val() ([]byte, error)
ID() common.ItemID
SortedField() []byte
@@ -119,10 +119,10 @@ func (i *item) SortedField() []byte {
return i.sortedField
}
-func (i *item) Family(family string) ([]byte, error) {
+func (i *item) Family(family []byte) ([]byte, error) {
d := dataBucket{
seriesID: i.seriesID,
- family: []byte(family),
+ family: family,
}
return i.data.Get(d.marshal(), uint64(i.itemID))
}
diff --git a/banyand/tsdb/series_write.go b/banyand/tsdb/series_write.go
index 9d8d97d..6a405bf 100644
--- a/banyand/tsdb/series_write.go
+++ b/banyand/tsdb/series_write.go
@@ -175,7 +175,7 @@ func (d dataBucket) marshal() []byte {
}
return bytes.Join([][]byte{
d.seriesID.Marshal(),
- hash(d.family),
+ d.family,
}, nil)
}
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 4afb96f..d6ea39d 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -81,7 +81,7 @@ func NewPath(entries []Entry) Path {
p.template = append(p.template, zeroIntBytes...)
continue
}
- entry := hash(e)
+ entry := Hash(e)
if !encounterAny {
p.offset += 8
}
@@ -105,7 +105,7 @@ func (p *Path) extractPrefix() {
}
func (p Path) Prepand(entry Entry) Path {
- e := hash(entry)
+ e := Hash(entry)
var prepand = func(src []byte, entry []byte) []byte {
dst := make([]byte, len(src)+len(entry))
copy(dst, entry)
@@ -156,7 +156,7 @@ func (s *seriesDB) GetByHashKey(key []byte) (Series, error) {
}
s.Lock()
defer s.Unlock()
- seriesID = hash(key)
+ seriesID = Hash(key)
err = s.seriesMetadata.Put(key, seriesID)
if err != nil {
return nil, err
@@ -277,7 +277,7 @@ func newSeriesDataBase(ctx context.Context, shardID common.ShardID, path string,
func HashEntity(entity Entity) []byte {
result := make(Entry, 0, len(entity)*8)
for _, entry := range entity {
- result = append(result, hash(entry)...)
+ result = append(result, Hash(entry)...)
}
return result
}
@@ -286,7 +286,7 @@ func SeriesID(entity Entity) common.SeriesID {
return common.SeriesID(convert.Hash((HashEntity(entity))))
}
-func hash(entry []byte) []byte {
+func Hash(entry []byte) []byte {
return convert.Uint64ToBytes(convert.Hash(entry))
}
diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
index 2838624..f7163dc 100644
--- a/banyand/tsdb/seriesdb_test.go
+++ b/banyand/tsdb/seriesdb_test.go
@@ -66,19 +66,19 @@ func TestNewPath(t *testing.T) {
want: Path{
isFull: true,
prefix: bytes.Join([][]byte{
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
- hash(convert.Uint64ToBytes(0)),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
+ Hash(convert.Uint64ToBytes(0)),
}, nil),
seekKey: bytes.Join([][]byte{
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
- hash(convert.Uint64ToBytes(0)),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
+ Hash(convert.Uint64ToBytes(0)),
}, nil),
template: bytes.Join([][]byte{
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
- hash(convert.Uint64ToBytes(0)),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
+ Hash(convert.Uint64ToBytes(0)),
}, nil),
mask: bytes.Join([][]byte{
maxIntBytes,
@@ -104,8 +104,8 @@ func TestNewPath(t *testing.T) {
}, nil),
template: bytes.Join([][]byte{
zeroIntBytes,
- hash([]byte("10.0.0.1")),
- hash(convert.Uint64ToBytes(0)),
+ Hash([]byte("10.0.0.1")),
+ Hash(convert.Uint64ToBytes(0)),
}, nil),
mask: bytes.Join([][]byte{
zeroIntBytes,
@@ -124,17 +124,17 @@ func TestNewPath(t *testing.T) {
},
want: Path{
prefix: bytes.Join([][]byte{
- hash([]byte("productpage")),
+ Hash([]byte("productpage")),
}, nil),
seekKey: bytes.Join([][]byte{
- hash([]byte("productpage")),
+ Hash([]byte("productpage")),
zeroIntBytes,
zeroIntBytes,
}, nil),
template: bytes.Join([][]byte{
- hash([]byte("productpage")),
+ Hash([]byte("productpage")),
zeroIntBytes,
- hash(convert.Uint64ToBytes(0)),
+ Hash(convert.Uint64ToBytes(0)),
}, nil),
mask: bytes.Join([][]byte{
maxIntBytes,
@@ -153,17 +153,17 @@ func TestNewPath(t *testing.T) {
},
want: Path{
prefix: bytes.Join([][]byte{
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
}, nil),
seekKey: bytes.Join([][]byte{
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
zeroIntBytes,
}, nil),
template: bytes.Join([][]byte{
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
zeroIntBytes,
}, nil),
mask: bytes.Join([][]byte{
@@ -185,22 +185,22 @@ func TestNewPath(t *testing.T) {
want: Path{
isFull: true,
prefix: bytes.Join([][]byte{
- hash([]byte("segment")),
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
- hash(convert.Uint64ToBytes(0)),
+ Hash([]byte("segment")),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
+ Hash(convert.Uint64ToBytes(0)),
}, nil),
seekKey: bytes.Join([][]byte{
- hash([]byte("segment")),
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
- hash(convert.Uint64ToBytes(0)),
+ Hash([]byte("segment")),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
+ Hash(convert.Uint64ToBytes(0)),
}, nil),
template: bytes.Join([][]byte{
- hash([]byte("segment")),
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
- hash(convert.Uint64ToBytes(0)),
+ Hash([]byte("segment")),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
+ Hash(convert.Uint64ToBytes(0)),
}, nil),
mask: bytes.Join([][]byte{
maxIntBytes,
@@ -220,18 +220,18 @@ func TestNewPath(t *testing.T) {
},
scope: Entry("segment"),
want: Path{
- prefix: hash([]byte("segment")),
+ prefix: Hash([]byte("segment")),
seekKey: bytes.Join([][]byte{
- hash([]byte("segment")),
+ Hash([]byte("segment")),
zeroIntBytes,
zeroIntBytes,
zeroIntBytes,
}, nil),
template: bytes.Join([][]byte{
- hash([]byte("segment")),
+ Hash([]byte("segment")),
zeroIntBytes,
- hash([]byte("10.0.0.1")),
- hash(convert.Uint64ToBytes(0)),
+ Hash([]byte("10.0.0.1")),
+ Hash(convert.Uint64ToBytes(0)),
}, nil),
mask: bytes.Join([][]byte{
maxIntBytes,
@@ -252,20 +252,20 @@ func TestNewPath(t *testing.T) {
scope: Entry("segment"),
want: Path{
prefix: bytes.Join([][]byte{
- hash([]byte("segment")),
- hash([]byte("productpage")),
+ Hash([]byte("segment")),
+ Hash([]byte("productpage")),
}, nil),
seekKey: bytes.Join([][]byte{
- hash([]byte("segment")),
- hash([]byte("productpage")),
+ Hash([]byte("segment")),
+ Hash([]byte("productpage")),
zeroIntBytes,
zeroIntBytes,
}, nil),
template: bytes.Join([][]byte{
- hash([]byte("segment")),
- hash([]byte("productpage")),
+ Hash([]byte("segment")),
+ Hash([]byte("productpage")),
zeroIntBytes,
- hash(convert.Uint64ToBytes(0)),
+ Hash(convert.Uint64ToBytes(0)),
}, nil),
mask: bytes.Join([][]byte{
maxIntBytes,
@@ -286,20 +286,20 @@ func TestNewPath(t *testing.T) {
scope: Entry("segment"),
want: Path{
prefix: bytes.Join([][]byte{
- hash([]byte("segment")),
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
+ Hash([]byte("segment")),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
}, nil),
seekKey: bytes.Join([][]byte{
- hash([]byte("segment")),
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
+ Hash([]byte("segment")),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
zeroIntBytes,
}, nil),
template: bytes.Join([][]byte{
- hash([]byte("segment")),
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
+ Hash([]byte("segment")),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
zeroIntBytes,
}, nil),
mask: bytes.Join([][]byte{
@@ -510,7 +510,7 @@ func setUpEntities(t *assert.Assertions, db SeriesDatabase) []*entityWithID {
},
}
for _, d := range data {
- d.id = common.SeriesID(convert.BytesToUint64(hash(HashEntity(d.entity))))
+ d.id = common.SeriesID(convert.BytesToUint64(Hash(HashEntity(d.entity))))
series, err := db.Get(d.entity)
t.NoError(err)
t.Greater(uint(series.ID()), uint(0))
diff --git a/pkg/bit/writer.go b/pkg/bit/writer.go
index 6302f9e..9995f01 100644
--- a/pkg/bit/writer.go
+++ b/pkg/bit/writer.go
@@ -30,14 +30,18 @@ type Writer struct {
// NewWriter create bit writer
func NewWriter(buffer *bytes.Buffer) *Writer {
- var bw Writer
+ bw := new(Writer)
bw.Reset(buffer)
- return &bw
+ return bw
}
// Reset writes to a new writer
func (w *Writer) Reset(buffer *bytes.Buffer) {
- w.out = buffer
+ if buffer == nil {
+ w.out.Reset()
+ } else {
+ w.out = buffer
+ }
w.cache = 0
w.available = 8
}
diff --git a/pkg/encoding/int.go b/pkg/encoding/int.go
index b8887ec..79fd69f 100644
--- a/pkg/encoding/int.go
+++ b/pkg/encoding/int.go
@@ -20,6 +20,9 @@ package encoding
import (
"bytes"
"encoding/binary"
+ "errors"
+ "io"
+ "sync"
"time"
"github.com/apache/skywalking-banyandb/pkg/bit"
@@ -27,6 +30,74 @@ import (
"github.com/apache/skywalking-banyandb/pkg/convert"
)
+var (
+ intEncoderPool = sync.Pool{
+ New: newIntEncoder,
+ }
+ intDecoderPool = sync.Pool{
+ New: func() interface{} {
+ return &intDecoder{}
+ },
+ }
+)
+
+type intEncoderPoolDelegator struct {
+ pool *sync.Pool
+ size int
+ fn ParseInterval
+}
+
+func NewIntEncoderPool(size int, fn ParseInterval) SeriesEncoderPool {
+ return &intEncoderPoolDelegator{
+ pool: &intEncoderPool,
+ size: size,
+ fn: fn,
+ }
+}
+
+func (b *intEncoderPoolDelegator) Get(metadata []byte) SeriesEncoder {
+ encoder := b.pool.Get().(*intEncoder)
+ encoder.size = b.size
+ encoder.fn = b.fn
+ encoder.Reset(metadata)
+ return encoder
+}
+
+func (b *intEncoderPoolDelegator) Put(encoder SeriesEncoder) {
+ _, ok := encoder.(*intEncoder)
+ if ok {
+ b.pool.Put(encoder)
+ }
+}
+
+type intDecoderPoolDelegator struct {
+ pool *sync.Pool
+ size int
+ fn ParseInterval
+}
+
+func NewIntDecoderPool(size int, fn ParseInterval) SeriesDecoderPool {
+ return &intDecoderPoolDelegator{
+ pool: &intDecoderPool,
+ size: size,
+ fn: fn,
+ }
+}
+
+func (b *intDecoderPoolDelegator) Get(_ []byte) SeriesDecoder {
+ decoder := b.pool.Get().(*intDecoder)
+ decoder.size = b.size
+ decoder.fn = b.fn
+ return decoder
+}
+
+func (b *intDecoderPoolDelegator) Put(decoder SeriesDecoder) {
+ _, ok := decoder.(*intDecoder)
+ if ok {
+ b.pool.Put(decoder)
+ }
+}
+
var (
_ SeriesEncoder = (*intEncoder)(nil)
)
@@ -40,19 +111,18 @@ type intEncoder struct {
fn ParseInterval
interval time.Duration
startTime uint64
+ prevTime uint64
num int
size int
}
-func NewIntEncoder(size int, fn ParseInterval) SeriesEncoder {
+func newIntEncoder() interface{} {
buff := &bytes.Buffer{}
bw := bit.NewWriter(buff)
return &intEncoder{
buff: buff,
bw: bw,
values: NewXOREncoder(bw),
- fn: fn,
- size: size,
}
}
@@ -62,8 +132,9 @@ func (ie *intEncoder) Append(ts uint64, value []byte) {
}
if ie.startTime == 0 {
ie.startTime = ts
+ ie.prevTime = ts
}
- gap := int(ts) - int(ie.startTime)
+ gap := int(ts) - int(ie.prevTime)
if gap < 0 {
return
}
@@ -72,8 +143,9 @@ func (ie *intEncoder) Append(ts uint64, value []byte) {
ie.bw.WriteBool(false)
ie.num++
}
+ ie.prevTime = ts
ie.bw.WriteBool(len(value) > 0)
- ie.values.Write(binary.LittleEndian.Uint64(value))
+ ie.values.Write(convert.BytesToUint64(value))
ie.num++
}
@@ -84,6 +156,8 @@ func (ie *intEncoder) IsFull() bool {
func (ie *intEncoder) Reset(key []byte) {
ie.bw.Reset(nil)
ie.interval = ie.fn(key)
+ ie.startTime = 0
+ ie.prevTime = 0
}
func (ie *intEncoder) Encode() ([]byte, error) {
@@ -91,7 +165,7 @@ func (ie *intEncoder) Encode() ([]byte, error) {
buffWriter := buffer.NewBufferWriter(ie.buff)
buffWriter.PutUint64(ie.startTime)
buffWriter.PutUint16(uint16(ie.size))
- return ie.buff.Bytes(), nil
+ return buffWriter.Bytes(), nil
}
func (ie *intEncoder) StartTime() uint64 {
@@ -109,14 +183,7 @@ type intDecoder struct {
area []byte
}
-func NewIntDecoder(size int, fn ParseInterval) SeriesDecoder {
- return &intDecoder{
- fn: fn,
- size: size,
- }
-}
-
-func (i intDecoder) Decode(key, data []byte) error {
+func (i *intDecoder) Decode(key, data []byte) error {
i.interval = i.fn(key)
i.startTime = binary.LittleEndian.Uint64(data[len(data)-10 : len(data)-2])
i.num = int(binary.LittleEndian.Uint16(data[len(data)-2:]))
@@ -148,14 +215,19 @@ func (i intDecoder) Iterator() SeriesIterator {
interval: int(i.interval),
br: br,
values: NewXORDecoder(br),
+ size: i.size,
}
}
-var _ SeriesIterator = (*intIterator)(nil)
+var (
+ _ SeriesIterator = (*intIterator)(nil)
+ zero = convert.BytesToUint64(convert.Int64ToBytes(0))
+)
type intIterator struct {
startTime uint64
interval int
+ size int
br *bit.Reader
values *XORDecoder
@@ -166,17 +238,24 @@ type intIterator struct {
}
func (i *intIterator) Next() bool {
- var b bool
- b, i.err = i.br.ReadBool()
- if i.err != nil {
+ if i.index >= i.size {
+ return false
+ }
+ b, err := i.br.ReadBool()
+ if errors.Is(err, io.EOF) {
+ return false
+ }
+ if err != nil {
+ i.err = err
return false
}
if b {
if i.values.Next() {
i.currVal = i.values.Value()
}
+ } else {
+ i.currVal = zero
}
- i.currVal = 0
i.currTime = i.startTime + uint64(i.interval*i.index)
i.index++
return true
diff --git a/pkg/encoding/int_test.go b/pkg/encoding/int_test.go
new file mode 100644
index 0000000..3c686d6
--- /dev/null
+++ b/pkg/encoding/int_test.go
@@ -0,0 +1,123 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+func TestNewIntEncoderAndDecoder(t *testing.T) {
+ type tsData struct {
+ ts []uint64
+ data []int64
+ }
+ tests := []struct {
+ name string
+ args tsData
+ want tsData
+ }{
+ {
+ name: "golden path",
+ args: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+ data: []int64{7, 8, 7, 9},
+ },
+ want: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+ data: []int64{7, 8, 7, 9},
+ },
+ },
+ {
+ name: "more than the size",
+ args: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(4 * time.Minute)},
+ data: []int64{7, 8, 7, 9, 6},
+ },
+ want: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+ data: []int64{7, 8, 7, 9},
+ },
+ },
+ {
+ name: "less than the size",
+ args: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
+ data: []int64{7, 8, 7},
+ },
+ want: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
+ data: []int64{7, 8, 7},
+ },
+ },
+ {
+ name: "empty slot in the middle",
+ args: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(4 * time.Minute)},
+ data: []int64{7, 9},
+ },
+ want: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+ data: []int64{7, 0, 0, 9},
+ },
+ },
+ }
+ key := []byte("foo")
+ fn := func(k []byte) time.Duration {
+ assert.Equal(t, key, k)
+ return 1 * time.Minute
+ }
+ encoderPool := NewIntEncoderPool(3, fn)
+ decoderPool := NewIntDecoderPool(3, fn)
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ at := assert.New(t)
+ encoder := encoderPool.Get(key)
+ decoder := decoderPool.Get(key)
+ encoder.Reset(key)
+ for i, v := range tt.args.ts {
+ encoder.Append(v, convert.Int64ToBytes(tt.args.data[i]))
+ if encoder.IsFull() {
+ break
+ }
+ }
+ bb, err := encoder.Encode()
+ at.NoError(err)
+ at.NoError(decoder.Decode(key, bb))
+ at.True(decoder.IsFull())
+ iter := decoder.Iterator()
+ i := 0
+ for ; iter.Next(); i++ {
+ at.NoError(iter.Error())
+ at.Equal(tt.want.ts[i], iter.Time())
+ at.Equal(tt.want.data[i], convert.BytesToInt64(iter.Val()))
+ v, err := decoder.Get(tt.want.ts[i])
+ at.NoError(err)
+ at.Equal(tt.want.data[i], convert.BytesToInt64(v))
+ }
+ if i == 0 {
+ at.Fail("empty data")
+ }
+ })
+ }
+}
diff --git a/pkg/encoding/plain.go b/pkg/encoding/plain.go
index e511441..626e3e9 100644
--- a/pkg/encoding/plain.go
+++ b/pkg/encoding/plain.go
@@ -31,59 +31,65 @@ import (
)
var (
- encoderPool = sync.Pool{
+ plainEncoderPool = sync.Pool{
New: newPlainEncoder,
}
- decoderPool = sync.Pool{
+ plainDecoderPool = sync.Pool{
New: func() interface{} {
return &plainDecoder{}
},
}
)
-type plainEncoderPool struct {
+type plainEncoderPoolDelegator struct {
pool *sync.Pool
size int
}
func NewPlainEncoderPool(size int) SeriesEncoderPool {
- return &plainEncoderPool{
- pool: &encoderPool,
+ return &plainEncoderPoolDelegator{
+ pool: &plainEncoderPool,
size: size,
}
}
-func (b *plainEncoderPool) Get(metadata []byte) SeriesEncoder {
+func (b *plainEncoderPoolDelegator) Get(metadata []byte) SeriesEncoder {
encoder := b.pool.Get().(*plainEncoder)
encoder.Reset(metadata)
encoder.valueSize = b.size
return encoder
}
-func (b *plainEncoderPool) Put(encoder SeriesEncoder) {
- b.pool.Put(encoder)
+func (b *plainEncoderPoolDelegator) Put(encoder SeriesEncoder) {
+ _, ok := encoder.(*plainEncoder)
+ if ok {
+ b.pool.Put(encoder)
+ }
}
-type plainDecoderPool struct {
+type plainDecoderPoolDelegator struct {
pool *sync.Pool
size int
}
func NewPlainDecoderPool(size int) SeriesDecoderPool {
- return &plainDecoderPool{
- pool: &decoderPool,
+ return &plainDecoderPoolDelegator{
+ pool: &plainDecoderPool,
size: size,
}
}
-func (b *plainDecoderPool) Get(_ []byte) SeriesDecoder {
+func (b *plainDecoderPoolDelegator) Get(_ []byte) SeriesDecoder {
decoder := b.pool.Get().(*plainDecoder)
decoder.valueSize = b.size
return decoder
}
-func (b *plainDecoderPool) Put(decoder SeriesDecoder) {
- b.pool.Put(decoder)
+func (b *plainDecoderPoolDelegator) Put(decoder SeriesDecoder) {
+ _, ok := decoder.(*plainDecoder)
+ if ok {
+ b.pool.Put(decoder)
+ }
}
var (
diff --git a/pkg/test/measure/testdata/measures/service_cpm_day.json b/pkg/test/measure/testdata/measures/service_cpm_day.json
index 5e8bd75..0a4190d 100644
--- a/pkg/test/measure/testdata/measures/service_cpm_day.json
+++ b/pkg/test/measure/testdata/measures/service_cpm_day.json
@@ -37,6 +37,6 @@
"entity_id"
]
},
- "interval": "1d",
+ "interval": "24h",
"updated_at": "2021-04-15T01:30:15.01Z"
}
\ No newline at end of file
diff --git a/pkg/test/measure/testdata/measures/service_instance_cpm_day.json b/pkg/test/measure/testdata/measures/service_instance_cpm_day.json
index 9476244..36d17ce 100644
--- a/pkg/test/measure/testdata/measures/service_instance_cpm_day.json
+++ b/pkg/test/measure/testdata/measures/service_instance_cpm_day.json
@@ -42,6 +42,6 @@
"entity_id"
]
},
- "interval": "1d",
+ "interval": "24h",
"updated_at": "2021-04-15T01:30:15.01Z"
}
\ No newline at end of file
diff --git a/pkg/timestamp/duration.go b/pkg/timestamp/duration.go
new file mode 100644
index 0000000..fd7fdbb
--- /dev/null
+++ b/pkg/timestamp/duration.go
@@ -0,0 +1,53 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package timestamp
+
+import (
+ "strconv"
+ "strings"
+ "time"
+)
+
+// ParseDuration parses a duration string.
+// A duration string is a possibly signed sequence of
+// decimal numbers, each with optional fraction and a unit suffix,
+// such as "300ms", "-1.5h" or "2h45m".
+// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h", "d".
+func ParseDuration(s string) (time.Duration, error) {
+ i := strings.Index(s, "d")
+ if i <= 0 {
+ return time.ParseDuration(s)
+ }
+ neg := false
+ if s != "" {
+ c := s[0]
+ if c == '-' || c == '+' {
+ neg = c == '-'
+ s = s[1:]
+ i--
+ }
+ }
+ d, err := strconv.Atoi(s[:i])
+ if neg {
+ d = -d
+ }
+ if err != nil {
+ return 0, err
+ }
+ return time.Hour * 24 * time.Duration(d), nil
+}
diff --git a/pkg/timestamp/duration_test.go b/pkg/timestamp/duration_test.go
new file mode 100644
index 0000000..1ce1d1b
--- /dev/null
+++ b/pkg/timestamp/duration_test.go
@@ -0,0 +1,60 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package timestamp
+
+import (
+ "testing"
+ "time"
+)
+
+func TestParseDuration(t *testing.T) {
+ tests := []struct {
+ name string
+ arg string
+ want time.Duration
+ wantErr bool
+ }{
+ {
+ name: "one day",
+ arg: "1d",
+ want: time.Hour * 24,
+ },
+ {
+ name: "negative one day",
+ arg: "-1d",
+ want: -time.Hour * 24,
+ },
+ {
+ name: "5 hours",
+ arg: "5h",
+ want: time.Hour * 5,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, err := ParseDuration(tt.arg)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("ParseDuration() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if got != tt.want {
+ t.Errorf("ParseDuration() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}