You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/04/26 08:19:09 UTC
[skywalking-banyandb] branch patch-path created (now 7b32b88)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a change to branch patch-path
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
at 7b32b88 Introduce gorilla encoder to measure
This branch includes the following new commits:
new 7b32b88 Introduce gorilla encoder to measure
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[skywalking-banyandb] 01/01: Introduce gorilla encoder to measure
Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch patch-path
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 7b32b8894ec7d2d6f4bdbe69776d98210ca7fc84
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Apr 26 08:11:42 2022 +0000
Introduce gorilla encoder to measure
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
.gitignore | 3 +
api/proto/banyandb/database/v1/schema.pb.go | 1 +
api/proto/banyandb/database/v1/schema.proto | 1 +
banyand/liaison/grpc/measure_test.go | 17 +--
banyand/liaison/grpc/stream_test.go | 15 ++-
banyand/measure/encode.go | 123 +++++++++++++++++++++
banyand/measure/field_flag_test.go | 39 +++++++
banyand/measure/measure.go | 13 ++-
banyand/measure/measure_query.go | 4 +-
banyand/measure/measure_write.go | 19 +---
banyand/measure/metadata.go | 5 +-
banyand/stream/stream_query.go | 2 +-
banyand/stream/stream_write.go | 2 +-
banyand/tsdb/series_seek.go | 6 +-
banyand/tsdb/series_write.go | 2 +-
banyand/tsdb/seriesdb.go | 10 +-
banyand/tsdb/seriesdb_test.go | 110 +++++++++---------
pkg/bit/writer.go | 10 +-
pkg/encoding/int.go | 117 ++++++++++++++++----
pkg/encoding/int_test.go | 123 +++++++++++++++++++++
pkg/encoding/plain.go | 34 +++---
.../measure/testdata/measures/service_cpm_day.json | 2 +-
.../measures/service_instance_cpm_day.json | 2 +-
pkg/timestamp/duration.go | 53 +++++++++
pkg/timestamp/duration_test.go | 60 ++++++++++
25 files changed, 636 insertions(+), 137 deletions(-)
diff --git a/.gitignore b/.gitignore
index 1b3617e..2e3a8ba 100644
--- a/.gitignore
+++ b/.gitignore
@@ -46,3 +46,6 @@ build/release/
# mock files
*mock.go
*mock_test.go
+
+# etcd unix sockets
+localhost:*
diff --git a/api/proto/banyandb/database/v1/schema.pb.go b/api/proto/banyandb/database/v1/schema.pb.go
index e9b503a..b49fbef 100644
--- a/api/proto/banyandb/database/v1/schema.pb.go
+++ b/api/proto/banyandb/database/v1/schema.pb.go
@@ -671,6 +671,7 @@ type Measure struct {
// entity indicates which tags will be to generate a series and shard a measure
Entity *Entity `protobuf:"bytes,4,opt,name=entity,proto3" json:"entity,omitempty"`
// interval indicates how frequently to send a data point
+ // valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h", "d".
Interval string `protobuf:"bytes,5,opt,name=interval,proto3" json:"interval,omitempty"`
// updated_at indicates when the measure is updated
UpdatedAt *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=updated_at,json=updatedAt,proto3" json:"updated_at,omitempty"`
diff --git a/api/proto/banyandb/database/v1/schema.proto b/api/proto/banyandb/database/v1/schema.proto
index 41df79d..86b9cad 100644
--- a/api/proto/banyandb/database/v1/schema.proto
+++ b/api/proto/banyandb/database/v1/schema.proto
@@ -103,6 +103,7 @@ message Measure {
// entity indicates which tags will be to generate a series and shard a measure
Entity entity = 4;
// interval indicates how frequently to send a data point
+ // valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h", "d".
string interval = 5;
// updated_at indicates when the measure is updated
google.protobuf.Timestamp updated_at = 6;
diff --git a/banyand/liaison/grpc/measure_test.go b/banyand/liaison/grpc/measure_test.go
index 27fd27d..22289cc 100644
--- a/banyand/liaison/grpc/measure_test.go
+++ b/banyand/liaison/grpc/measure_test.go
@@ -38,19 +38,21 @@ import (
)
var _ = Describe("Measure", func() {
- var rootPath, metadataPath string
- var gracefulStop, deferRootFunc, deferMetadataFunc func()
+ var streamPath, measurePath, metadataPath string
+ var gracefulStop, deferStreamFunc, deferMeasureFunc, deferMetadataFunc func()
var conn *grpclib.ClientConn
BeforeEach(func() {
var err error
- rootPath, deferRootFunc, err = test.NewSpace()
+ streamPath, deferStreamFunc, err = test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+ measurePath, deferMeasureFunc, err = test.NewSpace()
Expect(err).NotTo(HaveOccurred())
metadataPath, deferMetadataFunc, err = test.NewSpace()
Expect(err).NotTo(HaveOccurred())
})
- It("is a plain server", func() {
+ FIt("is a plain server", func() {
By("Verifying an empty server")
- flags := []string{"--measure-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
+ flags := []string{"--stream-root-path=" + streamPath, "--measure-root-path=" + measurePath, "--metadata-root-path=" + metadataPath}
gracefulStop = setup(flags)
var err error
conn, err = grpclib.Dial("localhost:17912", grpclib.WithInsecure())
@@ -75,7 +77,7 @@ var _ = Describe("Measure", func() {
}, defaultEventallyTimeout).Should(Equal(1))
})
It("is a TLS server", func() {
- flags := []string{"--tls=true", "--measure-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
+ flags := []string{"--tls=true", "--stream-root-path=" + streamPath, "--measure-root-path=" + measurePath, "--metadata-root-path=" + metadataPath}
_, currentFile, _, _ := runtime.Caller(0)
basePath := filepath.Dir(currentFile)
certFile := filepath.Join(basePath, "testdata/server_cert.pem")
@@ -98,7 +100,8 @@ var _ = Describe("Measure", func() {
_ = conn.Close()
gracefulStop()
deferMetadataFunc()
- deferRootFunc()
+ deferStreamFunc()
+ deferMeasureFunc()
})
})
diff --git a/banyand/liaison/grpc/stream_test.go b/banyand/liaison/grpc/stream_test.go
index 8f02215..5e52e1b 100644
--- a/banyand/liaison/grpc/stream_test.go
+++ b/banyand/liaison/grpc/stream_test.go
@@ -45,19 +45,21 @@ import (
)
var _ = Describe("Stream", func() {
- var rootPath, metadataPath string
- var gracefulStop, deferRootFunc, deferMetadataFunc func()
+ var streamPath, measurePath, metadataPath string
+ var gracefulStop, deferStreamFunc, deferMeasureFunc, deferMetadataFunc func()
var conn *grpclib.ClientConn
BeforeEach(func() {
var err error
- rootPath, deferRootFunc, err = test.NewSpace()
+ streamPath, deferStreamFunc, err = test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+ measurePath, deferMeasureFunc, err = test.NewSpace()
Expect(err).NotTo(HaveOccurred())
metadataPath, deferMetadataFunc, err = test.NewSpace()
Expect(err).NotTo(HaveOccurred())
})
It("is a plain server", func() {
By("Verifying an empty server")
- flags := []string{"--stream-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
+ flags := []string{"--stream-root-path=" + streamPath, "--measure-root-path=" + measurePath, "--metadata-root-path=" + metadataPath}
gracefulStop = setup(flags)
var err error
conn, err = grpclib.Dial("localhost:17912", grpclib.WithInsecure())
@@ -82,7 +84,7 @@ var _ = Describe("Stream", func() {
}, defaultEventallyTimeout).Should(Equal(1))
})
It("is a TLS server", func() {
- flags := []string{"--tls=true", "--stream-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
+ flags := []string{"--tls=true", "--stream-root-path=" + streamPath, "--measure-root-path=" + measurePath, "--metadata-root-path=" + metadataPath}
_, currentFile, _, _ := runtime.Caller(0)
basePath := filepath.Dir(currentFile)
certFile := filepath.Join(basePath, "testdata/server_cert.pem")
@@ -105,7 +107,8 @@ var _ = Describe("Stream", func() {
_ = conn.Close()
gracefulStop()
deferMetadataFunc()
- deferRootFunc()
+ deferStreamFunc()
+ deferMeasureFunc()
})
})
diff --git a/banyand/measure/encode.go b/banyand/measure/encode.go
new file mode 100644
index 0000000..3d6fcf3
--- /dev/null
+++ b/banyand/measure/encode.go
@@ -0,0 +1,123 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package measure
+
+import (
+ "time"
+
+ databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+var (
+ _ encoding.SeriesEncoderPool = (*encoderPool)(nil)
+ _ encoding.SeriesDecoderPool = (*decoderPool)(nil)
+ intervalFn = func(key []byte) time.Duration {
+ _, interval, err := decodeFieldFlag(key)
+ if err != nil {
+ panic(err)
+ }
+ return interval
+ }
+)
+
+type encoderPool struct {
+ intPool encoding.SeriesEncoderPool
+ defaultPool encoding.SeriesEncoderPool
+ l *logger.Logger
+}
+
+func newEncoderPool(size int, l *logger.Logger) encoding.SeriesEncoderPool {
+ return &encoderPool{
+ intPool: encoding.NewIntEncoderPool(size, intervalFn),
+ defaultPool: encoding.NewPlainEncoderPool(size),
+ l: l,
+ }
+}
+
+func (p *encoderPool) Get(metadata []byte) encoding.SeriesEncoder {
+ fieldSpec, _, err := decodeFieldFlag(metadata)
+ if err != nil {
+ p.l.Err(err).Msg("failed to decode field flag")
+ return p.defaultPool.Get(metadata)
+ }
+ if fieldSpec.EncodingMethod == databasev1.EncodingMethod_ENCODING_METHOD_GORILLA {
+ return p.intPool.Get(metadata)
+ }
+ return p.defaultPool.Get(metadata)
+}
+
+func (p *encoderPool) Put(encoder encoding.SeriesEncoder) {
+ p.intPool.Put(encoder)
+ p.defaultPool.Put(encoder)
+}
+
+type decoderPool struct {
+ intPool encoding.SeriesDecoderPool
+ defaultPool encoding.SeriesDecoderPool
+ l *logger.Logger
+}
+
+func newDecoderPool(size int, l *logger.Logger) encoding.SeriesDecoderPool {
+ return &decoderPool{
+ intPool: encoding.NewIntDecoderPool(size, intervalFn),
+ defaultPool: encoding.NewPlainDecoderPool(size),
+ l: l,
+ }
+}
+
+func (p *decoderPool) Get(metadata []byte) encoding.SeriesDecoder {
+ fieldSpec, _, err := decodeFieldFlag(metadata)
+ if err != nil {
+ p.l.Err(err).Msg("failed to decode field flag")
+ return p.defaultPool.Get(metadata)
+ }
+ if fieldSpec.EncodingMethod == databasev1.EncodingMethod_ENCODING_METHOD_GORILLA {
+ return p.intPool.Get(metadata)
+ }
+ return p.defaultPool.Get(metadata)
+}
+
+func (p *decoderPool) Put(decoder encoding.SeriesDecoder) {
+ p.intPool.Put(decoder)
+ p.defaultPool.Put(decoder)
+}
+
+const fieldFlagLength = 9
+
+func encoderFieldFlag(fieldSpec *databasev1.FieldSpec, interval time.Duration) []byte {
+ encodingMethod := byte(fieldSpec.GetEncodingMethod().Number())
+ compressionMethod := byte(fieldSpec.GetCompressionMethod().Number())
+ bb := make([]byte, fieldFlagLength)
+ bb[0] = encodingMethod<<4 | compressionMethod
+ copy(bb[1:], convert.Int64ToBytes(int64(interval)))
+ return bb
+}
+
+func decodeFieldFlag(key []byte) (*databasev1.FieldSpec, time.Duration, error) {
+ if len(key) < fieldFlagLength {
+ return nil, 0, ErrMalformedFieldFlag
+ }
+ b := key[len(key)-9:]
+ return &databasev1.FieldSpec{
+ EncodingMethod: databasev1.EncodingMethod(int32(b[0]) >> 4),
+ CompressionMethod: databasev1.CompressionMethod((int32(b[0] & 0x0F))),
+ }, time.Duration(convert.BytesToInt64(b[1:])), nil
+}
diff --git a/banyand/measure/field_flag_test.go b/banyand/measure/field_flag_test.go
new file mode 100644
index 0000000..baec5b8
--- /dev/null
+++ b/banyand/measure/field_flag_test.go
@@ -0,0 +1,39 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package measure
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+
+ databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+func TestEncodeFieldFlag(t *testing.T) {
+ flag := encoderFieldFlag(&databasev1.FieldSpec{
+ EncodingMethod: databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
+ CompressionMethod: databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
+ }, time.Minute)
+ fieldSpec, interval, err := decodeFieldFlag(flag)
+ assert.NoError(t, err)
+ assert.Equal(t, databasev1.EncodingMethod_ENCODING_METHOD_GORILLA, fieldSpec.EncodingMethod)
+ assert.Equal(t, databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD, fieldSpec.CompressionMethod)
+ assert.Equal(t, time.Minute, interval)
+}
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 481197d..d9688c9 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -19,6 +19,7 @@ package measure
import (
"context"
+ "time"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -27,6 +28,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
// a chunk is 1MB
@@ -44,6 +46,7 @@ type measure struct {
entityLocator partition.EntityLocator
indexRules []*databasev1.IndexRule
indexWriter *index.Writer
+ interval time.Duration
}
func (s *measure) GetSchema() *databasev1.Measure {
@@ -70,10 +73,14 @@ func (s *measure) Close() error {
return s.indexWriter.Close()
}
-func (s *measure) parseSpec() {
+func (s *measure) parseSpec() (err error) {
s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup()
s.entityLocator = partition.NewEntityLocator(s.schema.GetTagFamilies(), s.schema.GetEntity())
s.maxObservedModRevision = pbv1.ParseMaxModRevision(s.indexRules)
+ if s.schema.Interval != "" {
+ s.interval, err = timestamp.ParseDuration(s.schema.Interval)
+ }
+ return err
}
type measureSpec struct {
@@ -88,7 +95,9 @@ func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l *logger.
indexRules: spec.indexRules,
l: l,
}
- sm.parseSpec()
+ if err := sm.parseSpec(); err != nil {
+ return nil, err
+ }
ctx := context.WithValue(context.Background(), logger.ContextKey, l)
sm.db = db
diff --git a/banyand/measure/measure_query.go b/banyand/measure/measure_query.go
index 4d482b2..2d9473f 100644
--- a/banyand/measure/measure_query.go
+++ b/banyand/measure/measure_query.go
@@ -91,7 +91,7 @@ func (s *measure) Shard(id common.ShardID) (tsdb.Shard, error) {
}
func (s *measure) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) {
- familyRawBytes, err := item.Family(string(familyIdentity(family, TagFlag)))
+ familyRawBytes, err := item.Family(familyIdentity(family, TagFlag))
if err != nil {
return nil, err
}
@@ -132,7 +132,7 @@ func (s *measure) ParseField(name string, item tsdb.Item) (*measurev1.DataPoint_
break
}
}
- bytes, err := item.Family(string(familyIdentity(name, encoderFieldFlag(fieldSpec))))
+ bytes, err := item.Family(familyIdentity(name, encoderFieldFlag(fieldSpec, s.interval)))
if err != nil {
return nil, err
}
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index d5f3db8..efb5ac5 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -37,11 +37,10 @@ import (
)
var (
- ErrMalformedElement = errors.New("element is malformed")
-)
+ ErrMalformedElement = errors.New("element is malformed")
+ ErrMalformedFieldFlag = errors.New("field flag is malformed")
-const (
- TagFlag byte = iota
+ TagFlag []byte = make([]byte, fieldFlagLength)
)
func (s *measure) Write(value *measurev1.DataPointValue) error {
@@ -129,7 +128,7 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea
if data == nil {
continue
}
- builder.Family(familyIdentity(sm.GetFields()[fi].GetName(), encoderFieldFlag(fieldSpec)), data)
+ builder.Family(familyIdentity(sm.GetFields()[fi].GetName(), encoderFieldFlag(fieldSpec, s.interval)), data)
}
writer, errWrite := builder.Build()
if errWrite != nil {
@@ -196,8 +195,8 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
return
}
-func familyIdentity(name string, flag byte) []byte {
- return bytes.Join([][]byte{[]byte(name), {flag}}, nil)
+func familyIdentity(name string, flag []byte) []byte {
+ return bytes.Join([][]byte{tsdb.Hash([]byte(name)), flag}, nil)
}
func encodeFieldValue(fieldValue *modelv1.FieldValue) []byte {
@@ -223,9 +222,3 @@ func decodeFieldValue(fieldValue []byte, fieldSpec *databasev1.FieldSpec) *model
}
return &modelv1.FieldValue{Value: &modelv1.FieldValue_Null{}}
}
-
-func encoderFieldFlag(fieldSpec *databasev1.FieldSpec) byte {
- encodingMethod := byte(fieldSpec.GetEncodingMethod().Number())
- compressionMethod := byte(fieldSpec.GetCompressionMethod().Number())
- return encodingMethod<<4 | compressionMethod
-}
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 0499083..14fae45 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -30,7 +30,6 @@ import (
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
- "github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
)
@@ -205,8 +204,8 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
Location: path.Join(s.path, groupSchema.Metadata.Name),
ShardNum: groupSchema.ResourceOpts.ShardNum,
EncodingMethod: tsdb.EncodingMethod{
- EncoderPool: encoding.NewPlainEncoderPool(chunkSize),
- DecoderPool: encoding.NewPlainDecoderPool(chunkSize),
+ EncoderPool: newEncoderPool(chunkSize, s.l),
+ DecoderPool: newDecoderPool(chunkSize, s.l),
},
})
}
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index 493966f..97640e8 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -88,7 +88,7 @@ func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error) {
}
func (s *stream) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) {
- familyRawBytes, err := item.Family(family)
+ familyRawBytes, err := item.Family(tsdb.Hash([]byte(family)))
if err != nil {
return nil, errors.Wrapf(err, "parse family %s", family)
}
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index d73130e..c8d7ac7 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -102,7 +102,7 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *stre
if errMarshal != nil {
return nil, errMarshal
}
- builder.Family([]byte(sm.GetTagFamilies()[fi].GetName()), bb)
+ builder.Family(tsdb.Hash([]byte(sm.GetTagFamilies()[fi].GetName())), bb)
}
builder.Val([]byte(value.GetElementId()))
writer, errWrite := builder.Build()
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index eec6104..9051ed4 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -32,7 +32,7 @@ type Iterator interface {
}
type Item interface {
- Family(family string) ([]byte, error)
+ Family(family []byte) ([]byte, error)
Val() ([]byte, error)
ID() common.ItemID
SortedField() []byte
@@ -119,10 +119,10 @@ func (i *item) SortedField() []byte {
return i.sortedField
}
-func (i *item) Family(family string) ([]byte, error) {
+func (i *item) Family(family []byte) ([]byte, error) {
d := dataBucket{
seriesID: i.seriesID,
- family: []byte(family),
+ family: family,
}
return i.data.Get(d.marshal(), uint64(i.itemID))
}
diff --git a/banyand/tsdb/series_write.go b/banyand/tsdb/series_write.go
index 9d8d97d..6a405bf 100644
--- a/banyand/tsdb/series_write.go
+++ b/banyand/tsdb/series_write.go
@@ -175,7 +175,7 @@ func (d dataBucket) marshal() []byte {
}
return bytes.Join([][]byte{
d.seriesID.Marshal(),
- hash(d.family),
+ d.family,
}, nil)
}
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 4afb96f..d6ea39d 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -81,7 +81,7 @@ func NewPath(entries []Entry) Path {
p.template = append(p.template, zeroIntBytes...)
continue
}
- entry := hash(e)
+ entry := Hash(e)
if !encounterAny {
p.offset += 8
}
@@ -105,7 +105,7 @@ func (p *Path) extractPrefix() {
}
func (p Path) Prepand(entry Entry) Path {
- e := hash(entry)
+ e := Hash(entry)
var prepand = func(src []byte, entry []byte) []byte {
dst := make([]byte, len(src)+len(entry))
copy(dst, entry)
@@ -156,7 +156,7 @@ func (s *seriesDB) GetByHashKey(key []byte) (Series, error) {
}
s.Lock()
defer s.Unlock()
- seriesID = hash(key)
+ seriesID = Hash(key)
err = s.seriesMetadata.Put(key, seriesID)
if err != nil {
return nil, err
@@ -277,7 +277,7 @@ func newSeriesDataBase(ctx context.Context, shardID common.ShardID, path string,
func HashEntity(entity Entity) []byte {
result := make(Entry, 0, len(entity)*8)
for _, entry := range entity {
- result = append(result, hash(entry)...)
+ result = append(result, Hash(entry)...)
}
return result
}
@@ -286,7 +286,7 @@ func SeriesID(entity Entity) common.SeriesID {
return common.SeriesID(convert.Hash((HashEntity(entity))))
}
-func hash(entry []byte) []byte {
+func Hash(entry []byte) []byte {
return convert.Uint64ToBytes(convert.Hash(entry))
}
diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
index 2838624..f7163dc 100644
--- a/banyand/tsdb/seriesdb_test.go
+++ b/banyand/tsdb/seriesdb_test.go
@@ -66,19 +66,19 @@ func TestNewPath(t *testing.T) {
want: Path{
isFull: true,
prefix: bytes.Join([][]byte{
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
- hash(convert.Uint64ToBytes(0)),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
+ Hash(convert.Uint64ToBytes(0)),
}, nil),
seekKey: bytes.Join([][]byte{
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
- hash(convert.Uint64ToBytes(0)),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
+ Hash(convert.Uint64ToBytes(0)),
}, nil),
template: bytes.Join([][]byte{
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
- hash(convert.Uint64ToBytes(0)),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
+ Hash(convert.Uint64ToBytes(0)),
}, nil),
mask: bytes.Join([][]byte{
maxIntBytes,
@@ -104,8 +104,8 @@ func TestNewPath(t *testing.T) {
}, nil),
template: bytes.Join([][]byte{
zeroIntBytes,
- hash([]byte("10.0.0.1")),
- hash(convert.Uint64ToBytes(0)),
+ Hash([]byte("10.0.0.1")),
+ Hash(convert.Uint64ToBytes(0)),
}, nil),
mask: bytes.Join([][]byte{
zeroIntBytes,
@@ -124,17 +124,17 @@ func TestNewPath(t *testing.T) {
},
want: Path{
prefix: bytes.Join([][]byte{
- hash([]byte("productpage")),
+ Hash([]byte("productpage")),
}, nil),
seekKey: bytes.Join([][]byte{
- hash([]byte("productpage")),
+ Hash([]byte("productpage")),
zeroIntBytes,
zeroIntBytes,
}, nil),
template: bytes.Join([][]byte{
- hash([]byte("productpage")),
+ Hash([]byte("productpage")),
zeroIntBytes,
- hash(convert.Uint64ToBytes(0)),
+ Hash(convert.Uint64ToBytes(0)),
}, nil),
mask: bytes.Join([][]byte{
maxIntBytes,
@@ -153,17 +153,17 @@ func TestNewPath(t *testing.T) {
},
want: Path{
prefix: bytes.Join([][]byte{
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
}, nil),
seekKey: bytes.Join([][]byte{
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
zeroIntBytes,
}, nil),
template: bytes.Join([][]byte{
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
zeroIntBytes,
}, nil),
mask: bytes.Join([][]byte{
@@ -185,22 +185,22 @@ func TestNewPath(t *testing.T) {
want: Path{
isFull: true,
prefix: bytes.Join([][]byte{
- hash([]byte("segment")),
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
- hash(convert.Uint64ToBytes(0)),
+ Hash([]byte("segment")),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
+ Hash(convert.Uint64ToBytes(0)),
}, nil),
seekKey: bytes.Join([][]byte{
- hash([]byte("segment")),
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
- hash(convert.Uint64ToBytes(0)),
+ Hash([]byte("segment")),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
+ Hash(convert.Uint64ToBytes(0)),
}, nil),
template: bytes.Join([][]byte{
- hash([]byte("segment")),
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
- hash(convert.Uint64ToBytes(0)),
+ Hash([]byte("segment")),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
+ Hash(convert.Uint64ToBytes(0)),
}, nil),
mask: bytes.Join([][]byte{
maxIntBytes,
@@ -220,18 +220,18 @@ func TestNewPath(t *testing.T) {
},
scope: Entry("segment"),
want: Path{
- prefix: hash([]byte("segment")),
+ prefix: Hash([]byte("segment")),
seekKey: bytes.Join([][]byte{
- hash([]byte("segment")),
+ Hash([]byte("segment")),
zeroIntBytes,
zeroIntBytes,
zeroIntBytes,
}, nil),
template: bytes.Join([][]byte{
- hash([]byte("segment")),
+ Hash([]byte("segment")),
zeroIntBytes,
- hash([]byte("10.0.0.1")),
- hash(convert.Uint64ToBytes(0)),
+ Hash([]byte("10.0.0.1")),
+ Hash(convert.Uint64ToBytes(0)),
}, nil),
mask: bytes.Join([][]byte{
maxIntBytes,
@@ -252,20 +252,20 @@ func TestNewPath(t *testing.T) {
scope: Entry("segment"),
want: Path{
prefix: bytes.Join([][]byte{
- hash([]byte("segment")),
- hash([]byte("productpage")),
+ Hash([]byte("segment")),
+ Hash([]byte("productpage")),
}, nil),
seekKey: bytes.Join([][]byte{
- hash([]byte("segment")),
- hash([]byte("productpage")),
+ Hash([]byte("segment")),
+ Hash([]byte("productpage")),
zeroIntBytes,
zeroIntBytes,
}, nil),
template: bytes.Join([][]byte{
- hash([]byte("segment")),
- hash([]byte("productpage")),
+ Hash([]byte("segment")),
+ Hash([]byte("productpage")),
zeroIntBytes,
- hash(convert.Uint64ToBytes(0)),
+ Hash(convert.Uint64ToBytes(0)),
}, nil),
mask: bytes.Join([][]byte{
maxIntBytes,
@@ -286,20 +286,20 @@ func TestNewPath(t *testing.T) {
scope: Entry("segment"),
want: Path{
prefix: bytes.Join([][]byte{
- hash([]byte("segment")),
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
+ Hash([]byte("segment")),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
}, nil),
seekKey: bytes.Join([][]byte{
- hash([]byte("segment")),
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
+ Hash([]byte("segment")),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
zeroIntBytes,
}, nil),
template: bytes.Join([][]byte{
- hash([]byte("segment")),
- hash([]byte("productpage")),
- hash([]byte("10.0.0.1")),
+ Hash([]byte("segment")),
+ Hash([]byte("productpage")),
+ Hash([]byte("10.0.0.1")),
zeroIntBytes,
}, nil),
mask: bytes.Join([][]byte{
@@ -510,7 +510,7 @@ func setUpEntities(t *assert.Assertions, db SeriesDatabase) []*entityWithID {
},
}
for _, d := range data {
- d.id = common.SeriesID(convert.BytesToUint64(hash(HashEntity(d.entity))))
+ d.id = common.SeriesID(convert.BytesToUint64(Hash(HashEntity(d.entity))))
series, err := db.Get(d.entity)
t.NoError(err)
t.Greater(uint(series.ID()), uint(0))
diff --git a/pkg/bit/writer.go b/pkg/bit/writer.go
index 6302f9e..9995f01 100644
--- a/pkg/bit/writer.go
+++ b/pkg/bit/writer.go
@@ -30,14 +30,18 @@ type Writer struct {
// NewWriter create bit writer
func NewWriter(buffer *bytes.Buffer) *Writer {
- var bw Writer
+ bw := new(Writer)
bw.Reset(buffer)
- return &bw
+ return bw
}
// Reset writes to a new writer
func (w *Writer) Reset(buffer *bytes.Buffer) {
- w.out = buffer
+ if buffer == nil {
+ w.out.Reset()
+ } else {
+ w.out = buffer
+ }
w.cache = 0
w.available = 8
}
diff --git a/pkg/encoding/int.go b/pkg/encoding/int.go
index b8887ec..79fd69f 100644
--- a/pkg/encoding/int.go
+++ b/pkg/encoding/int.go
@@ -20,6 +20,9 @@ package encoding
import (
"bytes"
"encoding/binary"
+ "errors"
+ "io"
+ "sync"
"time"
"github.com/apache/skywalking-banyandb/pkg/bit"
@@ -27,6 +30,74 @@ import (
"github.com/apache/skywalking-banyandb/pkg/convert"
)
+var (
+ intEncoderPool = sync.Pool{
+ New: newIntEncoder,
+ }
+ intDecoderPool = sync.Pool{
+ New: func() interface{} {
+ return &intDecoder{}
+ },
+ }
+)
+
+type intEncoderPoolDelegator struct {
+ pool *sync.Pool
+ size int
+ fn ParseInterval
+}
+
+func NewIntEncoderPool(size int, fn ParseInterval) SeriesEncoderPool {
+ return &intEncoderPoolDelegator{
+ pool: &intEncoderPool,
+ size: size,
+ fn: fn,
+ }
+}
+
+func (b *intEncoderPoolDelegator) Get(metadata []byte) SeriesEncoder {
+ encoder := b.pool.Get().(*intEncoder)
+ encoder.size = b.size
+ encoder.fn = b.fn
+ encoder.Reset(metadata)
+ return encoder
+}
+
+func (b *intEncoderPoolDelegator) Put(encoder SeriesEncoder) {
+ _, ok := encoder.(*intEncoder)
+ if ok {
+ b.pool.Put(encoder)
+ }
+}
+
+type intDecoderPoolDelegator struct {
+ pool *sync.Pool
+ size int
+ fn ParseInterval
+}
+
+func NewIntDecoderPool(size int, fn ParseInterval) SeriesDecoderPool {
+ return &intDecoderPoolDelegator{
+ pool: &intDecoderPool,
+ size: size,
+ fn: fn,
+ }
+}
+
+func (b *intDecoderPoolDelegator) Get(_ []byte) SeriesDecoder {
+ decoder := b.pool.Get().(*intDecoder)
+ decoder.size = b.size
+ decoder.fn = b.fn
+ return decoder
+}
+
+func (b *intDecoderPoolDelegator) Put(decoder SeriesDecoder) {
+ _, ok := decoder.(*intDecoder)
+ if ok {
+ b.pool.Put(decoder)
+ }
+}
+
var (
_ SeriesEncoder = (*intEncoder)(nil)
)
@@ -40,19 +111,18 @@ type intEncoder struct {
fn ParseInterval
interval time.Duration
startTime uint64
+ prevTime uint64
num int
size int
}
-func NewIntEncoder(size int, fn ParseInterval) SeriesEncoder {
+func newIntEncoder() interface{} {
buff := &bytes.Buffer{}
bw := bit.NewWriter(buff)
return &intEncoder{
buff: buff,
bw: bw,
values: NewXOREncoder(bw),
- fn: fn,
- size: size,
}
}
@@ -62,8 +132,9 @@ func (ie *intEncoder) Append(ts uint64, value []byte) {
}
if ie.startTime == 0 {
ie.startTime = ts
+ ie.prevTime = ts
}
- gap := int(ts) - int(ie.startTime)
+ gap := int(ts) - int(ie.prevTime)
if gap < 0 {
return
}
@@ -72,8 +143,9 @@ func (ie *intEncoder) Append(ts uint64, value []byte) {
ie.bw.WriteBool(false)
ie.num++
}
+ ie.prevTime = ts
ie.bw.WriteBool(len(value) > 0)
- ie.values.Write(binary.LittleEndian.Uint64(value))
+ ie.values.Write(convert.BytesToUint64(value))
ie.num++
}
@@ -84,6 +156,8 @@ func (ie *intEncoder) IsFull() bool {
func (ie *intEncoder) Reset(key []byte) {
ie.bw.Reset(nil)
ie.interval = ie.fn(key)
+ ie.startTime = 0
+ ie.prevTime = 0
}
func (ie *intEncoder) Encode() ([]byte, error) {
@@ -91,7 +165,7 @@ func (ie *intEncoder) Encode() ([]byte, error) {
buffWriter := buffer.NewBufferWriter(ie.buff)
buffWriter.PutUint64(ie.startTime)
buffWriter.PutUint16(uint16(ie.size))
- return ie.buff.Bytes(), nil
+ return buffWriter.Bytes(), nil
}
func (ie *intEncoder) StartTime() uint64 {
@@ -109,14 +183,7 @@ type intDecoder struct {
area []byte
}
-func NewIntDecoder(size int, fn ParseInterval) SeriesDecoder {
- return &intDecoder{
- fn: fn,
- size: size,
- }
-}
-
-func (i intDecoder) Decode(key, data []byte) error {
+func (i *intDecoder) Decode(key, data []byte) error {
i.interval = i.fn(key)
i.startTime = binary.LittleEndian.Uint64(data[len(data)-10 : len(data)-2])
i.num = int(binary.LittleEndian.Uint16(data[len(data)-2:]))
@@ -148,14 +215,19 @@ func (i intDecoder) Iterator() SeriesIterator {
interval: int(i.interval),
br: br,
values: NewXORDecoder(br),
+ size: i.size,
}
}
-var _ SeriesIterator = (*intIterator)(nil)
+var (
+ _ SeriesIterator = (*intIterator)(nil)
+ zero = convert.BytesToUint64(convert.Int64ToBytes(0))
+)
type intIterator struct {
startTime uint64
interval int
+ size int
br *bit.Reader
values *XORDecoder
@@ -166,17 +238,24 @@ type intIterator struct {
}
func (i *intIterator) Next() bool {
- var b bool
- b, i.err = i.br.ReadBool()
- if i.err != nil {
+ if i.index >= i.size {
+ return false
+ }
+ b, err := i.br.ReadBool()
+ if errors.Is(err, io.EOF) {
+ return false
+ }
+ if err != nil {
+ i.err = err
return false
}
if b {
if i.values.Next() {
i.currVal = i.values.Value()
}
+ } else {
+ i.currVal = zero
}
- i.currVal = 0
i.currTime = i.startTime + uint64(i.interval*i.index)
i.index++
return true
diff --git a/pkg/encoding/int_test.go b/pkg/encoding/int_test.go
new file mode 100644
index 0000000..3c686d6
--- /dev/null
+++ b/pkg/encoding/int_test.go
@@ -0,0 +1,123 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+func TestNewIntEncoderAndDecoder(t *testing.T) {
+ type tsData struct {
+ ts []uint64
+ data []int64
+ }
+ tests := []struct {
+ name string
+ args tsData
+ want tsData
+ }{
+ {
+ name: "golden path",
+ args: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+ data: []int64{7, 8, 7, 9},
+ },
+ want: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+ data: []int64{7, 8, 7, 9},
+ },
+ },
+ {
+ name: "more than the size",
+ args: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(4 * time.Minute)},
+ data: []int64{7, 8, 7, 9, 6},
+ },
+ want: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+ data: []int64{7, 8, 7, 9},
+ },
+ },
+ {
+ name: "less than the size",
+ args: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
+ data: []int64{7, 8, 7},
+ },
+ want: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
+ data: []int64{7, 8, 7},
+ },
+ },
+ {
+ name: "empty slot in the middle",
+ args: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(4 * time.Minute)},
+ data: []int64{7, 9},
+ },
+ want: tsData{
+ ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+ data: []int64{7, 0, 0, 9},
+ },
+ },
+ }
+ key := []byte("foo")
+ fn := func(k []byte) time.Duration {
+ assert.Equal(t, key, k)
+ return 1 * time.Minute
+ }
+ encoderPool := NewIntEncoderPool(3, fn)
+ decoderPool := NewIntDecoderPool(3, fn)
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ at := assert.New(t)
+ encoder := encoderPool.Get(key)
+ decoder := decoderPool.Get(key)
+ encoder.Reset(key)
+ for i, v := range tt.args.ts {
+ encoder.Append(v, convert.Int64ToBytes(tt.args.data[i]))
+ if encoder.IsFull() {
+ break
+ }
+ }
+ bb, err := encoder.Encode()
+ at.NoError(err)
+ at.NoError(decoder.Decode(key, bb))
+ at.True(decoder.IsFull())
+ iter := decoder.Iterator()
+ i := 0
+ for ; iter.Next(); i++ {
+ at.NoError(iter.Error())
+ at.Equal(tt.want.ts[i], iter.Time())
+ at.Equal(tt.want.data[i], convert.BytesToInt64(iter.Val()))
+ v, err := decoder.Get(tt.want.ts[i])
+ at.NoError(err)
+ at.Equal(tt.want.data[i], convert.BytesToInt64(v))
+ }
+ if i == 0 {
+ at.Fail("empty data")
+ }
+ })
+ }
+}
diff --git a/pkg/encoding/plain.go b/pkg/encoding/plain.go
index e511441..626e3e9 100644
--- a/pkg/encoding/plain.go
+++ b/pkg/encoding/plain.go
@@ -31,59 +31,65 @@ import (
)
var (
- encoderPool = sync.Pool{
+ plainEncoderPool = sync.Pool{
New: newPlainEncoder,
}
- decoderPool = sync.Pool{
+ plainDecoderPool = sync.Pool{
New: func() interface{} {
return &plainDecoder{}
},
}
)
-type plainEncoderPool struct {
+type plainEncoderPoolDelegator struct {
pool *sync.Pool
size int
}
func NewPlainEncoderPool(size int) SeriesEncoderPool {
- return &plainEncoderPool{
- pool: &encoderPool,
+ return &plainEncoderPoolDelegator{
+ pool: &plainEncoderPool,
size: size,
}
}
-func (b *plainEncoderPool) Get(metadata []byte) SeriesEncoder {
+func (b *plainEncoderPoolDelegator) Get(metadata []byte) SeriesEncoder {
encoder := b.pool.Get().(*plainEncoder)
encoder.Reset(metadata)
encoder.valueSize = b.size
return encoder
}
-func (b *plainEncoderPool) Put(encoder SeriesEncoder) {
- b.pool.Put(encoder)
+func (b *plainEncoderPoolDelegator) Put(encoder SeriesEncoder) {
+ _, ok := encoder.(*plainEncoder)
+ if ok {
+ b.pool.Put(encoder)
+ }
}
-type plainDecoderPool struct {
+type plainDecoderPoolDelegator struct {
pool *sync.Pool
size int
}
func NewPlainDecoderPool(size int) SeriesDecoderPool {
- return &plainDecoderPool{
- pool: &decoderPool,
+ return &plainDecoderPoolDelegator{
+ pool: &plainDecoderPool,
size: size,
}
}
-func (b *plainDecoderPool) Get(_ []byte) SeriesDecoder {
+func (b *plainDecoderPoolDelegator) Get(_ []byte) SeriesDecoder {
decoder := b.pool.Get().(*plainDecoder)
decoder.valueSize = b.size
return decoder
}
-func (b *plainDecoderPool) Put(decoder SeriesDecoder) {
- b.pool.Put(decoder)
+func (b *plainDecoderPoolDelegator) Put(decoder SeriesDecoder) {
+ _, ok := decoder.(*plainDecoder)
+ if ok {
+ b.pool.Put(decoder)
+ }
}
var (
diff --git a/pkg/test/measure/testdata/measures/service_cpm_day.json b/pkg/test/measure/testdata/measures/service_cpm_day.json
index 5e8bd75..0a4190d 100644
--- a/pkg/test/measure/testdata/measures/service_cpm_day.json
+++ b/pkg/test/measure/testdata/measures/service_cpm_day.json
@@ -37,6 +37,6 @@
"entity_id"
]
},
- "interval": "1d",
+ "interval": "24h",
"updated_at": "2021-04-15T01:30:15.01Z"
}
\ No newline at end of file
diff --git a/pkg/test/measure/testdata/measures/service_instance_cpm_day.json b/pkg/test/measure/testdata/measures/service_instance_cpm_day.json
index 9476244..36d17ce 100644
--- a/pkg/test/measure/testdata/measures/service_instance_cpm_day.json
+++ b/pkg/test/measure/testdata/measures/service_instance_cpm_day.json
@@ -42,6 +42,6 @@
"entity_id"
]
},
- "interval": "1d",
+ "interval": "24h",
"updated_at": "2021-04-15T01:30:15.01Z"
}
\ No newline at end of file
diff --git a/pkg/timestamp/duration.go b/pkg/timestamp/duration.go
new file mode 100644
index 0000000..fd7fdbb
--- /dev/null
+++ b/pkg/timestamp/duration.go
@@ -0,0 +1,53 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package timestamp
+
+import (
+ "strconv"
+ "strings"
+ "time"
+)
+
+// ParseDuration parses a duration string.
+// A duration string is a possibly signed sequence of
+// decimal numbers, each with optional fraction and a unit suffix,
+// such as "300ms", "-1.5h" or "2h45m".
+// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h", "d".
+func ParseDuration(s string) (time.Duration, error) {
+ i := strings.Index(s, "d")
+ if i <= 0 {
+ return time.ParseDuration(s)
+ }
+ neg := false
+ if s != "" {
+ c := s[0]
+ if c == '-' || c == '+' {
+ neg = c == '-'
+ s = s[1:]
+ i--
+ }
+ }
+ d, err := strconv.Atoi(s[:i])
+ if neg {
+ d = -d
+ }
+ if err != nil {
+ return 0, err
+ }
+ return time.Hour * 24 * time.Duration(d), nil
+}
diff --git a/pkg/timestamp/duration_test.go b/pkg/timestamp/duration_test.go
new file mode 100644
index 0000000..1ce1d1b
--- /dev/null
+++ b/pkg/timestamp/duration_test.go
@@ -0,0 +1,60 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package timestamp
+
+import (
+ "testing"
+ "time"
+)
+
+func TestParseDuration(t *testing.T) {
+ tests := []struct {
+ name string
+ arg string
+ want time.Duration
+ wantErr bool
+ }{
+ {
+ name: "one day",
+ arg: "1d",
+ want: time.Hour * 24,
+ },
+ {
+ name: "negative one day",
+ arg: "-1d",
+ want: -time.Hour * 24,
+ },
+ {
+ name: "5 hours",
+ arg: "5h",
+ want: time.Hour * 5,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, err := ParseDuration(tt.arg)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("ParseDuration() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if got != tt.want {
+ t.Errorf("ParseDuration() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}