You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/04/26 08:19:10 UTC

[skywalking-banyandb] 01/01: Introduce gorilla encoder to measure

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch patch-path
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 7b32b8894ec7d2d6f4bdbe69776d98210ca7fc84
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Apr 26 08:11:42 2022 +0000

    Introduce gorilla encoder to measure
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 .gitignore                                         |   3 +
 api/proto/banyandb/database/v1/schema.pb.go        |   1 +
 api/proto/banyandb/database/v1/schema.proto        |   1 +
 banyand/liaison/grpc/measure_test.go               |  17 +--
 banyand/liaison/grpc/stream_test.go                |  15 ++-
 banyand/measure/encode.go                          | 123 +++++++++++++++++++++
 banyand/measure/field_flag_test.go                 |  39 +++++++
 banyand/measure/measure.go                         |  13 ++-
 banyand/measure/measure_query.go                   |   4 +-
 banyand/measure/measure_write.go                   |  19 +---
 banyand/measure/metadata.go                        |   5 +-
 banyand/stream/stream_query.go                     |   2 +-
 banyand/stream/stream_write.go                     |   2 +-
 banyand/tsdb/series_seek.go                        |   6 +-
 banyand/tsdb/series_write.go                       |   2 +-
 banyand/tsdb/seriesdb.go                           |  10 +-
 banyand/tsdb/seriesdb_test.go                      | 110 +++++++++---------
 pkg/bit/writer.go                                  |  10 +-
 pkg/encoding/int.go                                | 117 ++++++++++++++++----
 pkg/encoding/int_test.go                           | 123 +++++++++++++++++++++
 pkg/encoding/plain.go                              |  34 +++---
 .../measure/testdata/measures/service_cpm_day.json |   2 +-
 .../measures/service_instance_cpm_day.json         |   2 +-
 pkg/timestamp/duration.go                          |  53 +++++++++
 pkg/timestamp/duration_test.go                     |  60 ++++++++++
 25 files changed, 636 insertions(+), 137 deletions(-)

diff --git a/.gitignore b/.gitignore
index 1b3617e..2e3a8ba 100644
--- a/.gitignore
+++ b/.gitignore
@@ -46,3 +46,6 @@ build/release/
 # mock files
 *mock.go
 *mock_test.go
+
+# etcd unix sockets
+localhost:*
diff --git a/api/proto/banyandb/database/v1/schema.pb.go b/api/proto/banyandb/database/v1/schema.pb.go
index e9b503a..b49fbef 100644
--- a/api/proto/banyandb/database/v1/schema.pb.go
+++ b/api/proto/banyandb/database/v1/schema.pb.go
@@ -671,6 +671,7 @@ type Measure struct {
 	// entity indicates which tags will be to generate a series and shard a measure
 	Entity *Entity `protobuf:"bytes,4,opt,name=entity,proto3" json:"entity,omitempty"`
 	// interval indicates how frequently to send a data point
+	// valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h", "d".
 	Interval string `protobuf:"bytes,5,opt,name=interval,proto3" json:"interval,omitempty"`
 	// updated_at indicates when the measure is updated
 	UpdatedAt *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=updated_at,json=updatedAt,proto3" json:"updated_at,omitempty"`
diff --git a/api/proto/banyandb/database/v1/schema.proto b/api/proto/banyandb/database/v1/schema.proto
index 41df79d..86b9cad 100644
--- a/api/proto/banyandb/database/v1/schema.proto
+++ b/api/proto/banyandb/database/v1/schema.proto
@@ -103,6 +103,7 @@ message Measure {
     // entity indicates which tags will be to generate a series and shard a measure
     Entity entity = 4;
     // interval indicates how frequently to send a data point
+    // valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h", "d".
     string interval = 5;
     // updated_at indicates when the measure is updated
     google.protobuf.Timestamp updated_at = 6;
diff --git a/banyand/liaison/grpc/measure_test.go b/banyand/liaison/grpc/measure_test.go
index 27fd27d..22289cc 100644
--- a/banyand/liaison/grpc/measure_test.go
+++ b/banyand/liaison/grpc/measure_test.go
@@ -38,19 +38,21 @@ import (
 )
 
 var _ = Describe("Measure", func() {
-	var rootPath, metadataPath string
-	var gracefulStop, deferRootFunc, deferMetadataFunc func()
+	var streamPath, measurePath, metadataPath string
+	var gracefulStop, deferStreamFunc, deferMeasureFunc, deferMetadataFunc func()
 	var conn *grpclib.ClientConn
 	BeforeEach(func() {
 		var err error
-		rootPath, deferRootFunc, err = test.NewSpace()
+		streamPath, deferStreamFunc, err = test.NewSpace()
+		Expect(err).NotTo(HaveOccurred())
+		measurePath, deferMeasureFunc, err = test.NewSpace()
 		Expect(err).NotTo(HaveOccurred())
 		metadataPath, deferMetadataFunc, err = test.NewSpace()
 		Expect(err).NotTo(HaveOccurred())
 	})
-	It("is a plain server", func() {
+	FIt("is a plain server", func() {
 		By("Verifying an empty server")
-		flags := []string{"--measure-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
+		flags := []string{"--stream-root-path=" + streamPath, "--measure-root-path=" + measurePath, "--metadata-root-path=" + metadataPath}
 		gracefulStop = setup(flags)
 		var err error
 		conn, err = grpclib.Dial("localhost:17912", grpclib.WithInsecure())
@@ -75,7 +77,7 @@ var _ = Describe("Measure", func() {
 		}, defaultEventallyTimeout).Should(Equal(1))
 	})
 	It("is a TLS server", func() {
-		flags := []string{"--tls=true", "--measure-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
+		flags := []string{"--tls=true", "--stream-root-path=" + streamPath, "--measure-root-path=" + measurePath, "--metadata-root-path=" + metadataPath}
 		_, currentFile, _, _ := runtime.Caller(0)
 		basePath := filepath.Dir(currentFile)
 		certFile := filepath.Join(basePath, "testdata/server_cert.pem")
@@ -98,7 +100,8 @@ var _ = Describe("Measure", func() {
 		_ = conn.Close()
 		gracefulStop()
 		deferMetadataFunc()
-		deferRootFunc()
+		deferStreamFunc()
+		deferMeasureFunc()
 	})
 })
 
diff --git a/banyand/liaison/grpc/stream_test.go b/banyand/liaison/grpc/stream_test.go
index 8f02215..5e52e1b 100644
--- a/banyand/liaison/grpc/stream_test.go
+++ b/banyand/liaison/grpc/stream_test.go
@@ -45,19 +45,21 @@ import (
 )
 
 var _ = Describe("Stream", func() {
-	var rootPath, metadataPath string
-	var gracefulStop, deferRootFunc, deferMetadataFunc func()
+	var streamPath, measurePath, metadataPath string
+	var gracefulStop, deferStreamFunc, deferMeasureFunc, deferMetadataFunc func()
 	var conn *grpclib.ClientConn
 	BeforeEach(func() {
 		var err error
-		rootPath, deferRootFunc, err = test.NewSpace()
+		streamPath, deferStreamFunc, err = test.NewSpace()
+		Expect(err).NotTo(HaveOccurred())
+		measurePath, deferMeasureFunc, err = test.NewSpace()
 		Expect(err).NotTo(HaveOccurred())
 		metadataPath, deferMetadataFunc, err = test.NewSpace()
 		Expect(err).NotTo(HaveOccurred())
 	})
 	It("is a plain server", func() {
 		By("Verifying an empty server")
-		flags := []string{"--stream-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
+		flags := []string{"--stream-root-path=" + streamPath, "--measure-root-path=" + measurePath, "--metadata-root-path=" + metadataPath}
 		gracefulStop = setup(flags)
 		var err error
 		conn, err = grpclib.Dial("localhost:17912", grpclib.WithInsecure())
@@ -82,7 +84,7 @@ var _ = Describe("Stream", func() {
 		}, defaultEventallyTimeout).Should(Equal(1))
 	})
 	It("is a TLS server", func() {
-		flags := []string{"--tls=true", "--stream-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
+		flags := []string{"--tls=true", "--stream-root-path=" + streamPath, "--measure-root-path=" + measurePath, "--metadata-root-path=" + metadataPath}
 		_, currentFile, _, _ := runtime.Caller(0)
 		basePath := filepath.Dir(currentFile)
 		certFile := filepath.Join(basePath, "testdata/server_cert.pem")
@@ -105,7 +107,8 @@ var _ = Describe("Stream", func() {
 		_ = conn.Close()
 		gracefulStop()
 		deferMetadataFunc()
-		deferRootFunc()
+		deferStreamFunc()
+		deferMeasureFunc()
 	})
 })
 
diff --git a/banyand/measure/encode.go b/banyand/measure/encode.go
new file mode 100644
index 0000000..3d6fcf3
--- /dev/null
+++ b/banyand/measure/encode.go
@@ -0,0 +1,123 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package measure
+
+import (
+	"time"
+
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/encoding"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+var (
+	_          encoding.SeriesEncoderPool = (*encoderPool)(nil)
+	_          encoding.SeriesDecoderPool = (*decoderPool)(nil)
+	intervalFn                            = func(key []byte) time.Duration {
+		_, interval, err := decodeFieldFlag(key)
+		if err != nil {
+			panic(err)
+		}
+		return interval
+	}
+)
+
+type encoderPool struct {
+	intPool     encoding.SeriesEncoderPool
+	defaultPool encoding.SeriesEncoderPool
+	l           *logger.Logger
+}
+
+func newEncoderPool(size int, l *logger.Logger) encoding.SeriesEncoderPool {
+	return &encoderPool{
+		intPool:     encoding.NewIntEncoderPool(size, intervalFn),
+		defaultPool: encoding.NewPlainEncoderPool(size),
+		l:           l,
+	}
+}
+
+func (p *encoderPool) Get(metadata []byte) encoding.SeriesEncoder {
+	fieldSpec, _, err := decodeFieldFlag(metadata)
+	if err != nil {
+		p.l.Err(err).Msg("failed to decode field flag")
+		return p.defaultPool.Get(metadata)
+	}
+	if fieldSpec.EncodingMethod == databasev1.EncodingMethod_ENCODING_METHOD_GORILLA {
+		return p.intPool.Get(metadata)
+	}
+	return p.defaultPool.Get(metadata)
+}
+
+func (p *encoderPool) Put(encoder encoding.SeriesEncoder) {
+	p.intPool.Put(encoder)
+	p.defaultPool.Put(encoder)
+}
+
+type decoderPool struct {
+	intPool     encoding.SeriesDecoderPool
+	defaultPool encoding.SeriesDecoderPool
+	l           *logger.Logger
+}
+
+func newDecoderPool(size int, l *logger.Logger) encoding.SeriesDecoderPool {
+	return &decoderPool{
+		intPool:     encoding.NewIntDecoderPool(size, intervalFn),
+		defaultPool: encoding.NewPlainDecoderPool(size),
+		l:           l,
+	}
+}
+
+func (p *decoderPool) Get(metadata []byte) encoding.SeriesDecoder {
+	fieldSpec, _, err := decodeFieldFlag(metadata)
+	if err != nil {
+		p.l.Err(err).Msg("failed to decode field flag")
+		return p.defaultPool.Get(metadata)
+	}
+	if fieldSpec.EncodingMethod == databasev1.EncodingMethod_ENCODING_METHOD_GORILLA {
+		return p.intPool.Get(metadata)
+	}
+	return p.defaultPool.Get(metadata)
+}
+
+func (p *decoderPool) Put(decoder encoding.SeriesDecoder) {
+	p.intPool.Put(decoder)
+	p.defaultPool.Put(decoder)
+}
+
+const fieldFlagLength = 9
+
+func encoderFieldFlag(fieldSpec *databasev1.FieldSpec, interval time.Duration) []byte {
+	encodingMethod := byte(fieldSpec.GetEncodingMethod().Number())
+	compressionMethod := byte(fieldSpec.GetCompressionMethod().Number())
+	bb := make([]byte, fieldFlagLength)
+	bb[0] = encodingMethod<<4 | compressionMethod
+	copy(bb[1:], convert.Int64ToBytes(int64(interval)))
+	return bb
+}
+
+func decodeFieldFlag(key []byte) (*databasev1.FieldSpec, time.Duration, error) {
+	if len(key) < fieldFlagLength {
+		return nil, 0, ErrMalformedFieldFlag
+	}
+	b := key[len(key)-9:]
+	return &databasev1.FieldSpec{
+		EncodingMethod:    databasev1.EncodingMethod(int32(b[0]) >> 4),
+		CompressionMethod: databasev1.CompressionMethod((int32(b[0] & 0x0F))),
+	}, time.Duration(convert.BytesToInt64(b[1:])), nil
+}
diff --git a/banyand/measure/field_flag_test.go b/banyand/measure/field_flag_test.go
new file mode 100644
index 0000000..baec5b8
--- /dev/null
+++ b/banyand/measure/field_flag_test.go
@@ -0,0 +1,39 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package measure
+
+import (
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+func TestEncodeFieldFlag(t *testing.T) {
+	flag := encoderFieldFlag(&databasev1.FieldSpec{
+		EncodingMethod:    databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
+		CompressionMethod: databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
+	}, time.Minute)
+	fieldSpec, interval, err := decodeFieldFlag(flag)
+	assert.NoError(t, err)
+	assert.Equal(t, databasev1.EncodingMethod_ENCODING_METHOD_GORILLA, fieldSpec.EncodingMethod)
+	assert.Equal(t, databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD, fieldSpec.CompressionMethod)
+	assert.Equal(t, time.Minute, interval)
+}
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 481197d..d9688c9 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -19,6 +19,7 @@ package measure
 
 import (
 	"context"
+	"time"
 
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -27,6 +28,7 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
 	pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 // a chunk is 1MB
@@ -44,6 +46,7 @@ type measure struct {
 	entityLocator          partition.EntityLocator
 	indexRules             []*databasev1.IndexRule
 	indexWriter            *index.Writer
+	interval               time.Duration
 }
 
 func (s *measure) GetSchema() *databasev1.Measure {
@@ -70,10 +73,14 @@ func (s *measure) Close() error {
 	return s.indexWriter.Close()
 }
 
-func (s *measure) parseSpec() {
+func (s *measure) parseSpec() (err error) {
 	s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup()
 	s.entityLocator = partition.NewEntityLocator(s.schema.GetTagFamilies(), s.schema.GetEntity())
 	s.maxObservedModRevision = pbv1.ParseMaxModRevision(s.indexRules)
+	if s.schema.Interval != "" {
+		s.interval, err = timestamp.ParseDuration(s.schema.Interval)
+	}
+	return err
 }
 
 type measureSpec struct {
@@ -88,7 +95,9 @@ func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l *logger.
 		indexRules: spec.indexRules,
 		l:          l,
 	}
-	sm.parseSpec()
+	if err := sm.parseSpec(); err != nil {
+		return nil, err
+	}
 	ctx := context.WithValue(context.Background(), logger.ContextKey, l)
 
 	sm.db = db
diff --git a/banyand/measure/measure_query.go b/banyand/measure/measure_query.go
index 4d482b2..2d9473f 100644
--- a/banyand/measure/measure_query.go
+++ b/banyand/measure/measure_query.go
@@ -91,7 +91,7 @@ func (s *measure) Shard(id common.ShardID) (tsdb.Shard, error) {
 }
 
 func (s *measure) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) {
-	familyRawBytes, err := item.Family(string(familyIdentity(family, TagFlag)))
+	familyRawBytes, err := item.Family(familyIdentity(family, TagFlag))
 	if err != nil {
 		return nil, err
 	}
@@ -132,7 +132,7 @@ func (s *measure) ParseField(name string, item tsdb.Item) (*measurev1.DataPoint_
 			break
 		}
 	}
-	bytes, err := item.Family(string(familyIdentity(name, encoderFieldFlag(fieldSpec))))
+	bytes, err := item.Family(familyIdentity(name, encoderFieldFlag(fieldSpec, s.interval)))
 	if err != nil {
 		return nil, err
 	}
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index d5f3db8..efb5ac5 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -37,11 +37,10 @@ import (
 )
 
 var (
-	ErrMalformedElement = errors.New("element is malformed")
-)
+	ErrMalformedElement   = errors.New("element is malformed")
+	ErrMalformedFieldFlag = errors.New("field flag is malformed")
 
-const (
-	TagFlag byte = iota
+	TagFlag []byte = make([]byte, fieldFlagLength)
 )
 
 func (s *measure) Write(value *measurev1.DataPointValue) error {
@@ -129,7 +128,7 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea
 			if data == nil {
 				continue
 			}
-			builder.Family(familyIdentity(sm.GetFields()[fi].GetName(), encoderFieldFlag(fieldSpec)), data)
+			builder.Family(familyIdentity(sm.GetFields()[fi].GetName(), encoderFieldFlag(fieldSpec, s.interval)), data)
 		}
 		writer, errWrite := builder.Build()
 		if errWrite != nil {
@@ -196,8 +195,8 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
 	return
 }
 
-func familyIdentity(name string, flag byte) []byte {
-	return bytes.Join([][]byte{[]byte(name), {flag}}, nil)
+func familyIdentity(name string, flag []byte) []byte {
+	return bytes.Join([][]byte{tsdb.Hash([]byte(name)), flag}, nil)
 }
 
 func encodeFieldValue(fieldValue *modelv1.FieldValue) []byte {
@@ -223,9 +222,3 @@ func decodeFieldValue(fieldValue []byte, fieldSpec *databasev1.FieldSpec) *model
 	}
 	return &modelv1.FieldValue{Value: &modelv1.FieldValue_Null{}}
 }
-
-func encoderFieldFlag(fieldSpec *databasev1.FieldSpec) byte {
-	encodingMethod := byte(fieldSpec.GetEncodingMethod().Number())
-	compressionMethod := byte(fieldSpec.GetCompressionMethod().Number())
-	return encodingMethod<<4 | compressionMethod
-}
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 0499083..14fae45 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -30,7 +30,6 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/metadata"
 	"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
-	"github.com/apache/skywalking-banyandb/pkg/encoding"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
 )
@@ -205,8 +204,8 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
 			Location: path.Join(s.path, groupSchema.Metadata.Name),
 			ShardNum: groupSchema.ResourceOpts.ShardNum,
 			EncodingMethod: tsdb.EncodingMethod{
-				EncoderPool: encoding.NewPlainEncoderPool(chunkSize),
-				DecoderPool: encoding.NewPlainDecoderPool(chunkSize),
+				EncoderPool: newEncoderPool(chunkSize, s.l),
+				DecoderPool: newDecoderPool(chunkSize, s.l),
 			},
 		})
 }
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index 493966f..97640e8 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -88,7 +88,7 @@ func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error) {
 }
 
 func (s *stream) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) {
-	familyRawBytes, err := item.Family(family)
+	familyRawBytes, err := item.Family(tsdb.Hash([]byte(family)))
 	if err != nil {
 		return nil, errors.Wrapf(err, "parse family %s", family)
 	}
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index d73130e..c8d7ac7 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -102,7 +102,7 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *stre
 			if errMarshal != nil {
 				return nil, errMarshal
 			}
-			builder.Family([]byte(sm.GetTagFamilies()[fi].GetName()), bb)
+			builder.Family(tsdb.Hash([]byte(sm.GetTagFamilies()[fi].GetName())), bb)
 		}
 		builder.Val([]byte(value.GetElementId()))
 		writer, errWrite := builder.Build()
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index eec6104..9051ed4 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -32,7 +32,7 @@ type Iterator interface {
 }
 
 type Item interface {
-	Family(family string) ([]byte, error)
+	Family(family []byte) ([]byte, error)
 	Val() ([]byte, error)
 	ID() common.ItemID
 	SortedField() []byte
@@ -119,10 +119,10 @@ func (i *item) SortedField() []byte {
 	return i.sortedField
 }
 
-func (i *item) Family(family string) ([]byte, error) {
+func (i *item) Family(family []byte) ([]byte, error) {
 	d := dataBucket{
 		seriesID: i.seriesID,
-		family:   []byte(family),
+		family:   family,
 	}
 	return i.data.Get(d.marshal(), uint64(i.itemID))
 }
diff --git a/banyand/tsdb/series_write.go b/banyand/tsdb/series_write.go
index 9d8d97d..6a405bf 100644
--- a/banyand/tsdb/series_write.go
+++ b/banyand/tsdb/series_write.go
@@ -175,7 +175,7 @@ func (d dataBucket) marshal() []byte {
 	}
 	return bytes.Join([][]byte{
 		d.seriesID.Marshal(),
-		hash(d.family),
+		d.family,
 	}, nil)
 }
 
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 4afb96f..d6ea39d 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -81,7 +81,7 @@ func NewPath(entries []Entry) Path {
 			p.template = append(p.template, zeroIntBytes...)
 			continue
 		}
-		entry := hash(e)
+		entry := Hash(e)
 		if !encounterAny {
 			p.offset += 8
 		}
@@ -105,7 +105,7 @@ func (p *Path) extractPrefix() {
 }
 
 func (p Path) Prepand(entry Entry) Path {
-	e := hash(entry)
+	e := Hash(entry)
 	var prepand = func(src []byte, entry []byte) []byte {
 		dst := make([]byte, len(src)+len(entry))
 		copy(dst, entry)
@@ -156,7 +156,7 @@ func (s *seriesDB) GetByHashKey(key []byte) (Series, error) {
 	}
 	s.Lock()
 	defer s.Unlock()
-	seriesID = hash(key)
+	seriesID = Hash(key)
 	err = s.seriesMetadata.Put(key, seriesID)
 	if err != nil {
 		return nil, err
@@ -277,7 +277,7 @@ func newSeriesDataBase(ctx context.Context, shardID common.ShardID, path string,
 func HashEntity(entity Entity) []byte {
 	result := make(Entry, 0, len(entity)*8)
 	for _, entry := range entity {
-		result = append(result, hash(entry)...)
+		result = append(result, Hash(entry)...)
 	}
 	return result
 }
@@ -286,7 +286,7 @@ func SeriesID(entity Entity) common.SeriesID {
 	return common.SeriesID(convert.Hash((HashEntity(entity))))
 }
 
-func hash(entry []byte) []byte {
+func Hash(entry []byte) []byte {
 	return convert.Uint64ToBytes(convert.Hash(entry))
 }
 
diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
index 2838624..f7163dc 100644
--- a/banyand/tsdb/seriesdb_test.go
+++ b/banyand/tsdb/seriesdb_test.go
@@ -66,19 +66,19 @@ func TestNewPath(t *testing.T) {
 			want: Path{
 				isFull: true,
 				prefix: bytes.Join([][]byte{
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
-					hash(convert.Uint64ToBytes(0)),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
+					Hash(convert.Uint64ToBytes(0)),
 				}, nil),
 				seekKey: bytes.Join([][]byte{
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
-					hash(convert.Uint64ToBytes(0)),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
+					Hash(convert.Uint64ToBytes(0)),
 				}, nil),
 				template: bytes.Join([][]byte{
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
-					hash(convert.Uint64ToBytes(0)),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
+					Hash(convert.Uint64ToBytes(0)),
 				}, nil),
 				mask: bytes.Join([][]byte{
 					maxIntBytes,
@@ -104,8 +104,8 @@ func TestNewPath(t *testing.T) {
 				}, nil),
 				template: bytes.Join([][]byte{
 					zeroIntBytes,
-					hash([]byte("10.0.0.1")),
-					hash(convert.Uint64ToBytes(0)),
+					Hash([]byte("10.0.0.1")),
+					Hash(convert.Uint64ToBytes(0)),
 				}, nil),
 				mask: bytes.Join([][]byte{
 					zeroIntBytes,
@@ -124,17 +124,17 @@ func TestNewPath(t *testing.T) {
 			},
 			want: Path{
 				prefix: bytes.Join([][]byte{
-					hash([]byte("productpage")),
+					Hash([]byte("productpage")),
 				}, nil),
 				seekKey: bytes.Join([][]byte{
-					hash([]byte("productpage")),
+					Hash([]byte("productpage")),
 					zeroIntBytes,
 					zeroIntBytes,
 				}, nil),
 				template: bytes.Join([][]byte{
-					hash([]byte("productpage")),
+					Hash([]byte("productpage")),
 					zeroIntBytes,
-					hash(convert.Uint64ToBytes(0)),
+					Hash(convert.Uint64ToBytes(0)),
 				}, nil),
 				mask: bytes.Join([][]byte{
 					maxIntBytes,
@@ -153,17 +153,17 @@ func TestNewPath(t *testing.T) {
 			},
 			want: Path{
 				prefix: bytes.Join([][]byte{
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
 				}, nil),
 				seekKey: bytes.Join([][]byte{
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
 					zeroIntBytes,
 				}, nil),
 				template: bytes.Join([][]byte{
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
 					zeroIntBytes,
 				}, nil),
 				mask: bytes.Join([][]byte{
@@ -185,22 +185,22 @@ func TestNewPath(t *testing.T) {
 			want: Path{
 				isFull: true,
 				prefix: bytes.Join([][]byte{
-					hash([]byte("segment")),
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
-					hash(convert.Uint64ToBytes(0)),
+					Hash([]byte("segment")),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
+					Hash(convert.Uint64ToBytes(0)),
 				}, nil),
 				seekKey: bytes.Join([][]byte{
-					hash([]byte("segment")),
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
-					hash(convert.Uint64ToBytes(0)),
+					Hash([]byte("segment")),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
+					Hash(convert.Uint64ToBytes(0)),
 				}, nil),
 				template: bytes.Join([][]byte{
-					hash([]byte("segment")),
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
-					hash(convert.Uint64ToBytes(0)),
+					Hash([]byte("segment")),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
+					Hash(convert.Uint64ToBytes(0)),
 				}, nil),
 				mask: bytes.Join([][]byte{
 					maxIntBytes,
@@ -220,18 +220,18 @@ func TestNewPath(t *testing.T) {
 			},
 			scope: Entry("segment"),
 			want: Path{
-				prefix: hash([]byte("segment")),
+				prefix: Hash([]byte("segment")),
 				seekKey: bytes.Join([][]byte{
-					hash([]byte("segment")),
+					Hash([]byte("segment")),
 					zeroIntBytes,
 					zeroIntBytes,
 					zeroIntBytes,
 				}, nil),
 				template: bytes.Join([][]byte{
-					hash([]byte("segment")),
+					Hash([]byte("segment")),
 					zeroIntBytes,
-					hash([]byte("10.0.0.1")),
-					hash(convert.Uint64ToBytes(0)),
+					Hash([]byte("10.0.0.1")),
+					Hash(convert.Uint64ToBytes(0)),
 				}, nil),
 				mask: bytes.Join([][]byte{
 					maxIntBytes,
@@ -252,20 +252,20 @@ func TestNewPath(t *testing.T) {
 			scope: Entry("segment"),
 			want: Path{
 				prefix: bytes.Join([][]byte{
-					hash([]byte("segment")),
-					hash([]byte("productpage")),
+					Hash([]byte("segment")),
+					Hash([]byte("productpage")),
 				}, nil),
 				seekKey: bytes.Join([][]byte{
-					hash([]byte("segment")),
-					hash([]byte("productpage")),
+					Hash([]byte("segment")),
+					Hash([]byte("productpage")),
 					zeroIntBytes,
 					zeroIntBytes,
 				}, nil),
 				template: bytes.Join([][]byte{
-					hash([]byte("segment")),
-					hash([]byte("productpage")),
+					Hash([]byte("segment")),
+					Hash([]byte("productpage")),
 					zeroIntBytes,
-					hash(convert.Uint64ToBytes(0)),
+					Hash(convert.Uint64ToBytes(0)),
 				}, nil),
 				mask: bytes.Join([][]byte{
 					maxIntBytes,
@@ -286,20 +286,20 @@ func TestNewPath(t *testing.T) {
 			scope: Entry("segment"),
 			want: Path{
 				prefix: bytes.Join([][]byte{
-					hash([]byte("segment")),
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
+					Hash([]byte("segment")),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
 				}, nil),
 				seekKey: bytes.Join([][]byte{
-					hash([]byte("segment")),
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
+					Hash([]byte("segment")),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
 					zeroIntBytes,
 				}, nil),
 				template: bytes.Join([][]byte{
-					hash([]byte("segment")),
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
+					Hash([]byte("segment")),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
 					zeroIntBytes,
 				}, nil),
 				mask: bytes.Join([][]byte{
@@ -510,7 +510,7 @@ func setUpEntities(t *assert.Assertions, db SeriesDatabase) []*entityWithID {
 		},
 	}
 	for _, d := range data {
-		d.id = common.SeriesID(convert.BytesToUint64(hash(HashEntity(d.entity))))
+		d.id = common.SeriesID(convert.BytesToUint64(Hash(HashEntity(d.entity))))
 		series, err := db.Get(d.entity)
 		t.NoError(err)
 		t.Greater(uint(series.ID()), uint(0))
diff --git a/pkg/bit/writer.go b/pkg/bit/writer.go
index 6302f9e..9995f01 100644
--- a/pkg/bit/writer.go
+++ b/pkg/bit/writer.go
@@ -30,14 +30,18 @@ type Writer struct {
 
 // NewWriter create bit writer
 func NewWriter(buffer *bytes.Buffer) *Writer {
-	var bw Writer
+	bw := new(Writer)
 	bw.Reset(buffer)
-	return &bw
+	return bw
 }
 
 // Reset writes to a new writer
 func (w *Writer) Reset(buffer *bytes.Buffer) {
-	w.out = buffer
+	if buffer == nil {
+		w.out.Reset()
+	} else {
+		w.out = buffer
+	}
 	w.cache = 0
 	w.available = 8
 }
diff --git a/pkg/encoding/int.go b/pkg/encoding/int.go
index b8887ec..79fd69f 100644
--- a/pkg/encoding/int.go
+++ b/pkg/encoding/int.go
@@ -20,6 +20,9 @@ package encoding
 import (
 	"bytes"
 	"encoding/binary"
+	"errors"
+	"io"
+	"sync"
 	"time"
 
 	"github.com/apache/skywalking-banyandb/pkg/bit"
@@ -27,6 +30,74 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 )
 
+var (
+	intEncoderPool = sync.Pool{
+		New: newIntEncoder,
+	}
+	intDecoderPool = sync.Pool{
+		New: func() interface{} {
+			return &intDecoder{}
+		},
+	}
+)
+
+type intEncoderPoolDelegator struct {
+	pool *sync.Pool
+	size int
+	fn   ParseInterval
+}
+
+func NewIntEncoderPool(size int, fn ParseInterval) SeriesEncoderPool {
+	return &intEncoderPoolDelegator{
+		pool: &intEncoderPool,
+		size: size,
+		fn:   fn,
+	}
+}
+
+func (b *intEncoderPoolDelegator) Get(metadata []byte) SeriesEncoder {
+	encoder := b.pool.Get().(*intEncoder)
+	encoder.size = b.size
+	encoder.fn = b.fn
+	encoder.Reset(metadata)
+	return encoder
+}
+
+func (b *intEncoderPoolDelegator) Put(encoder SeriesEncoder) {
+	_, ok := encoder.(*intEncoder)
+	if ok {
+		b.pool.Put(encoder)
+	}
+}
+
+type intDecoderPoolDelegator struct {
+	pool *sync.Pool
+	size int
+	fn   ParseInterval
+}
+
+func NewIntDecoderPool(size int, fn ParseInterval) SeriesDecoderPool {
+	return &intDecoderPoolDelegator{
+		pool: &intDecoderPool,
+		size: size,
+		fn:   fn,
+	}
+}
+
+func (b *intDecoderPoolDelegator) Get(_ []byte) SeriesDecoder {
+	decoder := b.pool.Get().(*intDecoder)
+	decoder.size = b.size
+	decoder.fn = b.fn
+	return decoder
+}
+
+func (b *intDecoderPoolDelegator) Put(decoder SeriesDecoder) {
+	_, ok := decoder.(*intDecoder)
+	if ok {
+		b.pool.Put(decoder)
+	}
+}
+
 var (
 	_ SeriesEncoder = (*intEncoder)(nil)
 )
@@ -40,19 +111,18 @@ type intEncoder struct {
 	fn        ParseInterval
 	interval  time.Duration
 	startTime uint64
+	prevTime  uint64
 	num       int
 	size      int
 }
 
-func NewIntEncoder(size int, fn ParseInterval) SeriesEncoder {
+func newIntEncoder() interface{} {
 	buff := &bytes.Buffer{}
 	bw := bit.NewWriter(buff)
 	return &intEncoder{
 		buff:   buff,
 		bw:     bw,
 		values: NewXOREncoder(bw),
-		fn:     fn,
-		size:   size,
 	}
 }
 
@@ -62,8 +132,9 @@ func (ie *intEncoder) Append(ts uint64, value []byte) {
 	}
 	if ie.startTime == 0 {
 		ie.startTime = ts
+		ie.prevTime = ts
 	}
-	gap := int(ts) - int(ie.startTime)
+	gap := int(ts) - int(ie.prevTime)
 	if gap < 0 {
 		return
 	}
@@ -72,8 +143,9 @@ func (ie *intEncoder) Append(ts uint64, value []byte) {
 		ie.bw.WriteBool(false)
 		ie.num++
 	}
+	ie.prevTime = ts
 	ie.bw.WriteBool(len(value) > 0)
-	ie.values.Write(binary.LittleEndian.Uint64(value))
+	ie.values.Write(convert.BytesToUint64(value))
 	ie.num++
 }
 
@@ -84,6 +156,8 @@ func (ie *intEncoder) IsFull() bool {
 func (ie *intEncoder) Reset(key []byte) {
 	ie.bw.Reset(nil)
 	ie.interval = ie.fn(key)
+	ie.startTime = 0
+	ie.prevTime = 0
 }
 
 func (ie *intEncoder) Encode() ([]byte, error) {
@@ -91,7 +165,7 @@ func (ie *intEncoder) Encode() ([]byte, error) {
 	buffWriter := buffer.NewBufferWriter(ie.buff)
 	buffWriter.PutUint64(ie.startTime)
 	buffWriter.PutUint16(uint16(ie.size))
-	return ie.buff.Bytes(), nil
+	return buffWriter.Bytes(), nil
 }
 
 func (ie *intEncoder) StartTime() uint64 {
@@ -109,14 +183,7 @@ type intDecoder struct {
 	area      []byte
 }
 
-func NewIntDecoder(size int, fn ParseInterval) SeriesDecoder {
-	return &intDecoder{
-		fn:   fn,
-		size: size,
-	}
-}
-
-func (i intDecoder) Decode(key, data []byte) error {
+func (i *intDecoder) Decode(key, data []byte) error {
 	i.interval = i.fn(key)
 	i.startTime = binary.LittleEndian.Uint64(data[len(data)-10 : len(data)-2])
 	i.num = int(binary.LittleEndian.Uint16(data[len(data)-2:]))
@@ -148,14 +215,19 @@ func (i intDecoder) Iterator() SeriesIterator {
 		interval:  int(i.interval),
 		br:        br,
 		values:    NewXORDecoder(br),
+		size:      i.size,
 	}
 }
 
-var _ SeriesIterator = (*intIterator)(nil)
+var (
+	_    SeriesIterator = (*intIterator)(nil)
+	zero                = convert.BytesToUint64(convert.Int64ToBytes(0))
+)
 
 type intIterator struct {
 	startTime uint64
 	interval  int
+	size      int
 	br        *bit.Reader
 	values    *XORDecoder
 
@@ -166,17 +238,24 @@ type intIterator struct {
 }
 
 func (i *intIterator) Next() bool {
-	var b bool
-	b, i.err = i.br.ReadBool()
-	if i.err != nil {
+	if i.index >= i.size {
+		return false
+	}
+	b, err := i.br.ReadBool()
+	if errors.Is(err, io.EOF) {
+		return false
+	}
+	if err != nil {
+		i.err = err
 		return false
 	}
 	if b {
 		if i.values.Next() {
 			i.currVal = i.values.Value()
 		}
+	} else {
+		i.currVal = zero
 	}
-	i.currVal = 0
 	i.currTime = i.startTime + uint64(i.interval*i.index)
 	i.index++
 	return true
diff --git a/pkg/encoding/int_test.go b/pkg/encoding/int_test.go
new file mode 100644
index 0000000..3c686d6
--- /dev/null
+++ b/pkg/encoding/int_test.go
@@ -0,0 +1,123 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+func TestNewIntEncoderAndDecoder(t *testing.T) {
+	type tsData struct {
+		ts   []uint64
+		data []int64
+	}
+	tests := []struct {
+		name string
+		args tsData
+		want tsData
+	}{
+		{
+			name: "golden path",
+			args: tsData{
+				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+				data: []int64{7, 8, 7, 9},
+			},
+			want: tsData{
+				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+				data: []int64{7, 8, 7, 9},
+			},
+		},
+		{
+			name: "more than the size",
+			args: tsData{
+				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(4 * time.Minute)},
+				data: []int64{7, 8, 7, 9, 6},
+			},
+			want: tsData{
+				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+				data: []int64{7, 8, 7, 9},
+			},
+		},
+		{
+			name: "less than the size",
+			args: tsData{
+				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
+				data: []int64{7, 8, 7},
+			},
+			want: tsData{
+				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
+				data: []int64{7, 8, 7},
+			},
+		},
+		{
+			name: "empty slot in the middle",
+			args: tsData{
+				ts:   []uint64{uint64(time.Minute), uint64(4 * time.Minute)},
+				data: []int64{7, 9},
+			},
+			want: tsData{
+				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+				data: []int64{7, 0, 0, 9},
+			},
+		},
+	}
+	key := []byte("foo")
+	fn := func(k []byte) time.Duration {
+		assert.Equal(t, key, k)
+		return 1 * time.Minute
+	}
+	encoderPool := NewIntEncoderPool(3, fn)
+	decoderPool := NewIntDecoderPool(3, fn)
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			at := assert.New(t)
+			encoder := encoderPool.Get(key)
+			decoder := decoderPool.Get(key)
+			encoder.Reset(key)
+			for i, v := range tt.args.ts {
+				encoder.Append(v, convert.Int64ToBytes(tt.args.data[i]))
+				if encoder.IsFull() {
+					break
+				}
+			}
+			bb, err := encoder.Encode()
+			at.NoError(err)
+			at.NoError(decoder.Decode(key, bb))
+			at.True(decoder.IsFull())
+			iter := decoder.Iterator()
+			i := 0
+			for ; iter.Next(); i++ {
+				at.NoError(iter.Error())
+				at.Equal(tt.want.ts[i], iter.Time())
+				at.Equal(tt.want.data[i], convert.BytesToInt64(iter.Val()))
+				v, err := decoder.Get(tt.want.ts[i])
+				at.NoError(err)
+				at.Equal(tt.want.data[i], convert.BytesToInt64(v))
+			}
+			if i == 0 {
+				at.Fail("empty data")
+			}
+		})
+	}
+}
diff --git a/pkg/encoding/plain.go b/pkg/encoding/plain.go
index e511441..626e3e9 100644
--- a/pkg/encoding/plain.go
+++ b/pkg/encoding/plain.go
@@ -31,59 +31,65 @@ import (
 )
 
 var (
-	encoderPool = sync.Pool{
+	plainEncoderPool = sync.Pool{
 		New: newPlainEncoder,
 	}
-	decoderPool = sync.Pool{
+	plainDecoderPool = sync.Pool{
 		New: func() interface{} {
 			return &plainDecoder{}
 		},
 	}
 )
 
-type plainEncoderPool struct {
+type plainEncoderPoolDelegator struct {
 	pool *sync.Pool
 	size int
 }
 
 func NewPlainEncoderPool(size int) SeriesEncoderPool {
-	return &plainEncoderPool{
-		pool: &encoderPool,
+	return &plainEncoderPoolDelegator{
+		pool: &plainEncoderPool,
 		size: size,
 	}
 }
 
-func (b *plainEncoderPool) Get(metadata []byte) SeriesEncoder {
+func (b *plainEncoderPoolDelegator) Get(metadata []byte) SeriesEncoder {
 	encoder := b.pool.Get().(*plainEncoder)
 	encoder.Reset(metadata)
 	encoder.valueSize = b.size
 	return encoder
 }
 
-func (b *plainEncoderPool) Put(encoder SeriesEncoder) {
-	b.pool.Put(encoder)
+func (b *plainEncoderPoolDelegator) Put(encoder SeriesEncoder) {
+	_, ok := encoder.(*plainEncoder)
+	if ok {
+		b.pool.Put(encoder)
+	}
 }
 
-type plainDecoderPool struct {
+type plainDecoderPoolDelegator struct {
 	pool *sync.Pool
 	size int
 }
 
 func NewPlainDecoderPool(size int) SeriesDecoderPool {
-	return &plainDecoderPool{
-		pool: &decoderPool,
+	return &plainDecoderPoolDelegator{
+		pool: &plainDecoderPool,
 		size: size,
 	}
 }
 
-func (b *plainDecoderPool) Get(_ []byte) SeriesDecoder {
+func (b *plainDecoderPoolDelegator) Get(_ []byte) SeriesDecoder {
 	decoder := b.pool.Get().(*plainDecoder)
 	decoder.valueSize = b.size
 	return decoder
 }
 
-func (b *plainDecoderPool) Put(decoder SeriesDecoder) {
-	b.pool.Put(decoder)
+func (b *plainDecoderPoolDelegator) Put(decoder SeriesDecoder) {
+	_, ok := decoder.(*plainDecoder)
+	if ok {
+		b.pool.Put(decoder)
+	}
 }
 
 var (
diff --git a/pkg/test/measure/testdata/measures/service_cpm_day.json b/pkg/test/measure/testdata/measures/service_cpm_day.json
index 5e8bd75..0a4190d 100644
--- a/pkg/test/measure/testdata/measures/service_cpm_day.json
+++ b/pkg/test/measure/testdata/measures/service_cpm_day.json
@@ -37,6 +37,6 @@
       "entity_id"
     ]
   },
-  "interval": "1d",
+  "interval": "24h",
   "updated_at": "2021-04-15T01:30:15.01Z"
 }
\ No newline at end of file
diff --git a/pkg/test/measure/testdata/measures/service_instance_cpm_day.json b/pkg/test/measure/testdata/measures/service_instance_cpm_day.json
index 9476244..36d17ce 100644
--- a/pkg/test/measure/testdata/measures/service_instance_cpm_day.json
+++ b/pkg/test/measure/testdata/measures/service_instance_cpm_day.json
@@ -42,6 +42,6 @@
       "entity_id"
     ]
   },
-  "interval": "1d",
+  "interval": "24h",
   "updated_at": "2021-04-15T01:30:15.01Z"
 }
\ No newline at end of file
diff --git a/pkg/timestamp/duration.go b/pkg/timestamp/duration.go
new file mode 100644
index 0000000..fd7fdbb
--- /dev/null
+++ b/pkg/timestamp/duration.go
@@ -0,0 +1,53 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package timestamp
+
+import (
+	"strconv"
+	"strings"
+	"time"
+)
+
+// ParseDuration parses a duration string.
+// A duration string is a possibly signed sequence of
+// decimal numbers, each with optional fraction and a unit suffix,
+// such as "300ms", "-1.5h" or "2h45m".
+// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h", "d".
+func ParseDuration(s string) (time.Duration, error) {
+	i := strings.Index(s, "d")
+	if i <= 0 {
+		return time.ParseDuration(s)
+	}
+	neg := false
+	if s != "" {
+		c := s[0]
+		if c == '-' || c == '+' {
+			neg = c == '-'
+			s = s[1:]
+			i--
+		}
+	}
+	d, err := strconv.Atoi(s[:i])
+	if neg {
+		d = -d
+	}
+	if err != nil {
+		return 0, err
+	}
+	return time.Hour * 24 * time.Duration(d), nil
+}
diff --git a/pkg/timestamp/duration_test.go b/pkg/timestamp/duration_test.go
new file mode 100644
index 0000000..1ce1d1b
--- /dev/null
+++ b/pkg/timestamp/duration_test.go
@@ -0,0 +1,60 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package timestamp
+
+import (
+	"testing"
+	"time"
+)
+
+func TestParseDuration(t *testing.T) {
+	tests := []struct {
+		name    string
+		arg     string
+		want    time.Duration
+		wantErr bool
+	}{
+		{
+			name: "one day",
+			arg:  "1d",
+			want: time.Hour * 24,
+		},
+		{
+			name: "negative one day",
+			arg:  "-1d",
+			want: -time.Hour * 24,
+		},
+		{
+			name: "5 hours",
+			arg:  "5h",
+			want: time.Hour * 5,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got, err := ParseDuration(tt.arg)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("ParseDuration() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if got != tt.want {
+				t.Errorf("ParseDuration() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}