You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/04/26 08:19:09 UTC

[skywalking-banyandb] branch patch-path created (now 7b32b88)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch patch-path
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


      at 7b32b88  Introduce gorilla encoder to measure

This branch includes the following new commits:

     new 7b32b88  Introduce gorilla encoder to measure

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking-banyandb] 01/01: Introduce gorilla encoder to measure

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch patch-path
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 7b32b8894ec7d2d6f4bdbe69776d98210ca7fc84
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Apr 26 08:11:42 2022 +0000

    Introduce gorilla encoder to measure
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 .gitignore                                         |   3 +
 api/proto/banyandb/database/v1/schema.pb.go        |   1 +
 api/proto/banyandb/database/v1/schema.proto        |   1 +
 banyand/liaison/grpc/measure_test.go               |  17 +--
 banyand/liaison/grpc/stream_test.go                |  15 ++-
 banyand/measure/encode.go                          | 123 +++++++++++++++++++++
 banyand/measure/field_flag_test.go                 |  39 +++++++
 banyand/measure/measure.go                         |  13 ++-
 banyand/measure/measure_query.go                   |   4 +-
 banyand/measure/measure_write.go                   |  19 +---
 banyand/measure/metadata.go                        |   5 +-
 banyand/stream/stream_query.go                     |   2 +-
 banyand/stream/stream_write.go                     |   2 +-
 banyand/tsdb/series_seek.go                        |   6 +-
 banyand/tsdb/series_write.go                       |   2 +-
 banyand/tsdb/seriesdb.go                           |  10 +-
 banyand/tsdb/seriesdb_test.go                      | 110 +++++++++---------
 pkg/bit/writer.go                                  |  10 +-
 pkg/encoding/int.go                                | 117 ++++++++++++++++----
 pkg/encoding/int_test.go                           | 123 +++++++++++++++++++++
 pkg/encoding/plain.go                              |  34 +++---
 .../measure/testdata/measures/service_cpm_day.json |   2 +-
 .../measures/service_instance_cpm_day.json         |   2 +-
 pkg/timestamp/duration.go                          |  53 +++++++++
 pkg/timestamp/duration_test.go                     |  60 ++++++++++
 25 files changed, 636 insertions(+), 137 deletions(-)

diff --git a/.gitignore b/.gitignore
index 1b3617e..2e3a8ba 100644
--- a/.gitignore
+++ b/.gitignore
@@ -46,3 +46,6 @@ build/release/
 # mock files
 *mock.go
 *mock_test.go
+
+# etcd unix sockets
+localhost:*
diff --git a/api/proto/banyandb/database/v1/schema.pb.go b/api/proto/banyandb/database/v1/schema.pb.go
index e9b503a..b49fbef 100644
--- a/api/proto/banyandb/database/v1/schema.pb.go
+++ b/api/proto/banyandb/database/v1/schema.pb.go
@@ -671,6 +671,7 @@ type Measure struct {
 	// entity indicates which tags will be to generate a series and shard a measure
 	Entity *Entity `protobuf:"bytes,4,opt,name=entity,proto3" json:"entity,omitempty"`
 	// interval indicates how frequently to send a data point
+	// valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h", "d".
 	Interval string `protobuf:"bytes,5,opt,name=interval,proto3" json:"interval,omitempty"`
 	// updated_at indicates when the measure is updated
 	UpdatedAt *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=updated_at,json=updatedAt,proto3" json:"updated_at,omitempty"`
diff --git a/api/proto/banyandb/database/v1/schema.proto b/api/proto/banyandb/database/v1/schema.proto
index 41df79d..86b9cad 100644
--- a/api/proto/banyandb/database/v1/schema.proto
+++ b/api/proto/banyandb/database/v1/schema.proto
@@ -103,6 +103,7 @@ message Measure {
     // entity indicates which tags will be to generate a series and shard a measure
     Entity entity = 4;
     // interval indicates how frequently to send a data point
+    // valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h", "d".
     string interval = 5;
     // updated_at indicates when the measure is updated
     google.protobuf.Timestamp updated_at = 6;
diff --git a/banyand/liaison/grpc/measure_test.go b/banyand/liaison/grpc/measure_test.go
index 27fd27d..22289cc 100644
--- a/banyand/liaison/grpc/measure_test.go
+++ b/banyand/liaison/grpc/measure_test.go
@@ -38,19 +38,21 @@ import (
 )
 
 var _ = Describe("Measure", func() {
-	var rootPath, metadataPath string
-	var gracefulStop, deferRootFunc, deferMetadataFunc func()
+	var streamPath, measurePath, metadataPath string
+	var gracefulStop, deferStreamFunc, deferMeasureFunc, deferMetadataFunc func()
 	var conn *grpclib.ClientConn
 	BeforeEach(func() {
 		var err error
-		rootPath, deferRootFunc, err = test.NewSpace()
+		streamPath, deferStreamFunc, err = test.NewSpace()
+		Expect(err).NotTo(HaveOccurred())
+		measurePath, deferMeasureFunc, err = test.NewSpace()
 		Expect(err).NotTo(HaveOccurred())
 		metadataPath, deferMetadataFunc, err = test.NewSpace()
 		Expect(err).NotTo(HaveOccurred())
 	})
-	It("is a plain server", func() {
+	FIt("is a plain server", func() {
 		By("Verifying an empty server")
-		flags := []string{"--measure-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
+		flags := []string{"--stream-root-path=" + streamPath, "--measure-root-path=" + measurePath, "--metadata-root-path=" + metadataPath}
 		gracefulStop = setup(flags)
 		var err error
 		conn, err = grpclib.Dial("localhost:17912", grpclib.WithInsecure())
@@ -75,7 +77,7 @@ var _ = Describe("Measure", func() {
 		}, defaultEventallyTimeout).Should(Equal(1))
 	})
 	It("is a TLS server", func() {
-		flags := []string{"--tls=true", "--measure-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
+		flags := []string{"--tls=true", "--stream-root-path=" + streamPath, "--measure-root-path=" + measurePath, "--metadata-root-path=" + metadataPath}
 		_, currentFile, _, _ := runtime.Caller(0)
 		basePath := filepath.Dir(currentFile)
 		certFile := filepath.Join(basePath, "testdata/server_cert.pem")
@@ -98,7 +100,8 @@ var _ = Describe("Measure", func() {
 		_ = conn.Close()
 		gracefulStop()
 		deferMetadataFunc()
-		deferRootFunc()
+		deferStreamFunc()
+		deferMeasureFunc()
 	})
 })
 
diff --git a/banyand/liaison/grpc/stream_test.go b/banyand/liaison/grpc/stream_test.go
index 8f02215..5e52e1b 100644
--- a/banyand/liaison/grpc/stream_test.go
+++ b/banyand/liaison/grpc/stream_test.go
@@ -45,19 +45,21 @@ import (
 )
 
 var _ = Describe("Stream", func() {
-	var rootPath, metadataPath string
-	var gracefulStop, deferRootFunc, deferMetadataFunc func()
+	var streamPath, measurePath, metadataPath string
+	var gracefulStop, deferStreamFunc, deferMeasureFunc, deferMetadataFunc func()
 	var conn *grpclib.ClientConn
 	BeforeEach(func() {
 		var err error
-		rootPath, deferRootFunc, err = test.NewSpace()
+		streamPath, deferStreamFunc, err = test.NewSpace()
+		Expect(err).NotTo(HaveOccurred())
+		measurePath, deferMeasureFunc, err = test.NewSpace()
 		Expect(err).NotTo(HaveOccurred())
 		metadataPath, deferMetadataFunc, err = test.NewSpace()
 		Expect(err).NotTo(HaveOccurred())
 	})
 	It("is a plain server", func() {
 		By("Verifying an empty server")
-		flags := []string{"--stream-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
+		flags := []string{"--stream-root-path=" + streamPath, "--measure-root-path=" + measurePath, "--metadata-root-path=" + metadataPath}
 		gracefulStop = setup(flags)
 		var err error
 		conn, err = grpclib.Dial("localhost:17912", grpclib.WithInsecure())
@@ -82,7 +84,7 @@ var _ = Describe("Stream", func() {
 		}, defaultEventallyTimeout).Should(Equal(1))
 	})
 	It("is a TLS server", func() {
-		flags := []string{"--tls=true", "--stream-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
+		flags := []string{"--tls=true", "--stream-root-path=" + streamPath, "--measure-root-path=" + measurePath, "--metadata-root-path=" + metadataPath}
 		_, currentFile, _, _ := runtime.Caller(0)
 		basePath := filepath.Dir(currentFile)
 		certFile := filepath.Join(basePath, "testdata/server_cert.pem")
@@ -105,7 +107,8 @@ var _ = Describe("Stream", func() {
 		_ = conn.Close()
 		gracefulStop()
 		deferMetadataFunc()
-		deferRootFunc()
+		deferStreamFunc()
+		deferMeasureFunc()
 	})
 })
 
diff --git a/banyand/measure/encode.go b/banyand/measure/encode.go
new file mode 100644
index 0000000..3d6fcf3
--- /dev/null
+++ b/banyand/measure/encode.go
@@ -0,0 +1,123 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package measure
+
+import (
+	"time"
+
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/encoding"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+var (
+	_          encoding.SeriesEncoderPool = (*encoderPool)(nil)
+	_          encoding.SeriesDecoderPool = (*decoderPool)(nil)
+	intervalFn                            = func(key []byte) time.Duration {
+		_, interval, err := decodeFieldFlag(key)
+		if err != nil {
+			panic(err)
+		}
+		return interval
+	}
+)
+
+type encoderPool struct {
+	intPool     encoding.SeriesEncoderPool
+	defaultPool encoding.SeriesEncoderPool
+	l           *logger.Logger
+}
+
+func newEncoderPool(size int, l *logger.Logger) encoding.SeriesEncoderPool {
+	return &encoderPool{
+		intPool:     encoding.NewIntEncoderPool(size, intervalFn),
+		defaultPool: encoding.NewPlainEncoderPool(size),
+		l:           l,
+	}
+}
+
+func (p *encoderPool) Get(metadata []byte) encoding.SeriesEncoder {
+	fieldSpec, _, err := decodeFieldFlag(metadata)
+	if err != nil {
+		p.l.Err(err).Msg("failed to decode field flag")
+		return p.defaultPool.Get(metadata)
+	}
+	if fieldSpec.EncodingMethod == databasev1.EncodingMethod_ENCODING_METHOD_GORILLA {
+		return p.intPool.Get(metadata)
+	}
+	return p.defaultPool.Get(metadata)
+}
+
+func (p *encoderPool) Put(encoder encoding.SeriesEncoder) {
+	p.intPool.Put(encoder)
+	p.defaultPool.Put(encoder)
+}
+
+type decoderPool struct {
+	intPool     encoding.SeriesDecoderPool
+	defaultPool encoding.SeriesDecoderPool
+	l           *logger.Logger
+}
+
+func newDecoderPool(size int, l *logger.Logger) encoding.SeriesDecoderPool {
+	return &decoderPool{
+		intPool:     encoding.NewIntDecoderPool(size, intervalFn),
+		defaultPool: encoding.NewPlainDecoderPool(size),
+		l:           l,
+	}
+}
+
+func (p *decoderPool) Get(metadata []byte) encoding.SeriesDecoder {
+	fieldSpec, _, err := decodeFieldFlag(metadata)
+	if err != nil {
+		p.l.Err(err).Msg("failed to decode field flag")
+		return p.defaultPool.Get(metadata)
+	}
+	if fieldSpec.EncodingMethod == databasev1.EncodingMethod_ENCODING_METHOD_GORILLA {
+		return p.intPool.Get(metadata)
+	}
+	return p.defaultPool.Get(metadata)
+}
+
+func (p *decoderPool) Put(decoder encoding.SeriesDecoder) {
+	p.intPool.Put(decoder)
+	p.defaultPool.Put(decoder)
+}
+
+const fieldFlagLength = 9
+
+func encoderFieldFlag(fieldSpec *databasev1.FieldSpec, interval time.Duration) []byte {
+	encodingMethod := byte(fieldSpec.GetEncodingMethod().Number())
+	compressionMethod := byte(fieldSpec.GetCompressionMethod().Number())
+	bb := make([]byte, fieldFlagLength)
+	bb[0] = encodingMethod<<4 | compressionMethod
+	copy(bb[1:], convert.Int64ToBytes(int64(interval)))
+	return bb
+}
+
+func decodeFieldFlag(key []byte) (*databasev1.FieldSpec, time.Duration, error) {
+	if len(key) < fieldFlagLength {
+		return nil, 0, ErrMalformedFieldFlag
+	}
+	b := key[len(key)-9:]
+	return &databasev1.FieldSpec{
+		EncodingMethod:    databasev1.EncodingMethod(int32(b[0]) >> 4),
+		CompressionMethod: databasev1.CompressionMethod((int32(b[0] & 0x0F))),
+	}, time.Duration(convert.BytesToInt64(b[1:])), nil
+}
diff --git a/banyand/measure/field_flag_test.go b/banyand/measure/field_flag_test.go
new file mode 100644
index 0000000..baec5b8
--- /dev/null
+++ b/banyand/measure/field_flag_test.go
@@ -0,0 +1,39 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package measure
+
+import (
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+func TestEncodeFieldFlag(t *testing.T) {
+	flag := encoderFieldFlag(&databasev1.FieldSpec{
+		EncodingMethod:    databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
+		CompressionMethod: databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
+	}, time.Minute)
+	fieldSpec, interval, err := decodeFieldFlag(flag)
+	assert.NoError(t, err)
+	assert.Equal(t, databasev1.EncodingMethod_ENCODING_METHOD_GORILLA, fieldSpec.EncodingMethod)
+	assert.Equal(t, databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD, fieldSpec.CompressionMethod)
+	assert.Equal(t, time.Minute, interval)
+}
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 481197d..d9688c9 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -19,6 +19,7 @@ package measure
 
 import (
 	"context"
+	"time"
 
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -27,6 +28,7 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
 	pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 // a chunk is 1MB
@@ -44,6 +46,7 @@ type measure struct {
 	entityLocator          partition.EntityLocator
 	indexRules             []*databasev1.IndexRule
 	indexWriter            *index.Writer
+	interval               time.Duration
 }
 
 func (s *measure) GetSchema() *databasev1.Measure {
@@ -70,10 +73,14 @@ func (s *measure) Close() error {
 	return s.indexWriter.Close()
 }
 
-func (s *measure) parseSpec() {
+func (s *measure) parseSpec() (err error) {
 	s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup()
 	s.entityLocator = partition.NewEntityLocator(s.schema.GetTagFamilies(), s.schema.GetEntity())
 	s.maxObservedModRevision = pbv1.ParseMaxModRevision(s.indexRules)
+	if s.schema.Interval != "" {
+		s.interval, err = timestamp.ParseDuration(s.schema.Interval)
+	}
+	return err
 }
 
 type measureSpec struct {
@@ -88,7 +95,9 @@ func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l *logger.
 		indexRules: spec.indexRules,
 		l:          l,
 	}
-	sm.parseSpec()
+	if err := sm.parseSpec(); err != nil {
+		return nil, err
+	}
 	ctx := context.WithValue(context.Background(), logger.ContextKey, l)
 
 	sm.db = db
diff --git a/banyand/measure/measure_query.go b/banyand/measure/measure_query.go
index 4d482b2..2d9473f 100644
--- a/banyand/measure/measure_query.go
+++ b/banyand/measure/measure_query.go
@@ -91,7 +91,7 @@ func (s *measure) Shard(id common.ShardID) (tsdb.Shard, error) {
 }
 
 func (s *measure) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) {
-	familyRawBytes, err := item.Family(string(familyIdentity(family, TagFlag)))
+	familyRawBytes, err := item.Family(familyIdentity(family, TagFlag))
 	if err != nil {
 		return nil, err
 	}
@@ -132,7 +132,7 @@ func (s *measure) ParseField(name string, item tsdb.Item) (*measurev1.DataPoint_
 			break
 		}
 	}
-	bytes, err := item.Family(string(familyIdentity(name, encoderFieldFlag(fieldSpec))))
+	bytes, err := item.Family(familyIdentity(name, encoderFieldFlag(fieldSpec, s.interval)))
 	if err != nil {
 		return nil, err
 	}
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index d5f3db8..efb5ac5 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -37,11 +37,10 @@ import (
 )
 
 var (
-	ErrMalformedElement = errors.New("element is malformed")
-)
+	ErrMalformedElement   = errors.New("element is malformed")
+	ErrMalformedFieldFlag = errors.New("field flag is malformed")
 
-const (
-	TagFlag byte = iota
+	TagFlag []byte = make([]byte, fieldFlagLength)
 )
 
 func (s *measure) Write(value *measurev1.DataPointValue) error {
@@ -129,7 +128,7 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea
 			if data == nil {
 				continue
 			}
-			builder.Family(familyIdentity(sm.GetFields()[fi].GetName(), encoderFieldFlag(fieldSpec)), data)
+			builder.Family(familyIdentity(sm.GetFields()[fi].GetName(), encoderFieldFlag(fieldSpec, s.interval)), data)
 		}
 		writer, errWrite := builder.Build()
 		if errWrite != nil {
@@ -196,8 +195,8 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
 	return
 }
 
-func familyIdentity(name string, flag byte) []byte {
-	return bytes.Join([][]byte{[]byte(name), {flag}}, nil)
+func familyIdentity(name string, flag []byte) []byte {
+	return bytes.Join([][]byte{tsdb.Hash([]byte(name)), flag}, nil)
 }
 
 func encodeFieldValue(fieldValue *modelv1.FieldValue) []byte {
@@ -223,9 +222,3 @@ func decodeFieldValue(fieldValue []byte, fieldSpec *databasev1.FieldSpec) *model
 	}
 	return &modelv1.FieldValue{Value: &modelv1.FieldValue_Null{}}
 }
-
-func encoderFieldFlag(fieldSpec *databasev1.FieldSpec) byte {
-	encodingMethod := byte(fieldSpec.GetEncodingMethod().Number())
-	compressionMethod := byte(fieldSpec.GetCompressionMethod().Number())
-	return encodingMethod<<4 | compressionMethod
-}
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 0499083..14fae45 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -30,7 +30,6 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/metadata"
 	"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
-	"github.com/apache/skywalking-banyandb/pkg/encoding"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
 )
@@ -205,8 +204,8 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
 			Location: path.Join(s.path, groupSchema.Metadata.Name),
 			ShardNum: groupSchema.ResourceOpts.ShardNum,
 			EncodingMethod: tsdb.EncodingMethod{
-				EncoderPool: encoding.NewPlainEncoderPool(chunkSize),
-				DecoderPool: encoding.NewPlainDecoderPool(chunkSize),
+				EncoderPool: newEncoderPool(chunkSize, s.l),
+				DecoderPool: newDecoderPool(chunkSize, s.l),
 			},
 		})
 }
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index 493966f..97640e8 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -88,7 +88,7 @@ func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error) {
 }
 
 func (s *stream) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) {
-	familyRawBytes, err := item.Family(family)
+	familyRawBytes, err := item.Family(tsdb.Hash([]byte(family)))
 	if err != nil {
 		return nil, errors.Wrapf(err, "parse family %s", family)
 	}
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index d73130e..c8d7ac7 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -102,7 +102,7 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *stre
 			if errMarshal != nil {
 				return nil, errMarshal
 			}
-			builder.Family([]byte(sm.GetTagFamilies()[fi].GetName()), bb)
+			builder.Family(tsdb.Hash([]byte(sm.GetTagFamilies()[fi].GetName())), bb)
 		}
 		builder.Val([]byte(value.GetElementId()))
 		writer, errWrite := builder.Build()
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index eec6104..9051ed4 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -32,7 +32,7 @@ type Iterator interface {
 }
 
 type Item interface {
-	Family(family string) ([]byte, error)
+	Family(family []byte) ([]byte, error)
 	Val() ([]byte, error)
 	ID() common.ItemID
 	SortedField() []byte
@@ -119,10 +119,10 @@ func (i *item) SortedField() []byte {
 	return i.sortedField
 }
 
-func (i *item) Family(family string) ([]byte, error) {
+func (i *item) Family(family []byte) ([]byte, error) {
 	d := dataBucket{
 		seriesID: i.seriesID,
-		family:   []byte(family),
+		family:   family,
 	}
 	return i.data.Get(d.marshal(), uint64(i.itemID))
 }
diff --git a/banyand/tsdb/series_write.go b/banyand/tsdb/series_write.go
index 9d8d97d..6a405bf 100644
--- a/banyand/tsdb/series_write.go
+++ b/banyand/tsdb/series_write.go
@@ -175,7 +175,7 @@ func (d dataBucket) marshal() []byte {
 	}
 	return bytes.Join([][]byte{
 		d.seriesID.Marshal(),
-		hash(d.family),
+		d.family,
 	}, nil)
 }
 
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 4afb96f..d6ea39d 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -81,7 +81,7 @@ func NewPath(entries []Entry) Path {
 			p.template = append(p.template, zeroIntBytes...)
 			continue
 		}
-		entry := hash(e)
+		entry := Hash(e)
 		if !encounterAny {
 			p.offset += 8
 		}
@@ -105,7 +105,7 @@ func (p *Path) extractPrefix() {
 }
 
 func (p Path) Prepand(entry Entry) Path {
-	e := hash(entry)
+	e := Hash(entry)
 	var prepand = func(src []byte, entry []byte) []byte {
 		dst := make([]byte, len(src)+len(entry))
 		copy(dst, entry)
@@ -156,7 +156,7 @@ func (s *seriesDB) GetByHashKey(key []byte) (Series, error) {
 	}
 	s.Lock()
 	defer s.Unlock()
-	seriesID = hash(key)
+	seriesID = Hash(key)
 	err = s.seriesMetadata.Put(key, seriesID)
 	if err != nil {
 		return nil, err
@@ -277,7 +277,7 @@ func newSeriesDataBase(ctx context.Context, shardID common.ShardID, path string,
 func HashEntity(entity Entity) []byte {
 	result := make(Entry, 0, len(entity)*8)
 	for _, entry := range entity {
-		result = append(result, hash(entry)...)
+		result = append(result, Hash(entry)...)
 	}
 	return result
 }
@@ -286,7 +286,7 @@ func SeriesID(entity Entity) common.SeriesID {
 	return common.SeriesID(convert.Hash((HashEntity(entity))))
 }
 
-func hash(entry []byte) []byte {
+func Hash(entry []byte) []byte {
 	return convert.Uint64ToBytes(convert.Hash(entry))
 }
 
diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
index 2838624..f7163dc 100644
--- a/banyand/tsdb/seriesdb_test.go
+++ b/banyand/tsdb/seriesdb_test.go
@@ -66,19 +66,19 @@ func TestNewPath(t *testing.T) {
 			want: Path{
 				isFull: true,
 				prefix: bytes.Join([][]byte{
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
-					hash(convert.Uint64ToBytes(0)),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
+					Hash(convert.Uint64ToBytes(0)),
 				}, nil),
 				seekKey: bytes.Join([][]byte{
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
-					hash(convert.Uint64ToBytes(0)),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
+					Hash(convert.Uint64ToBytes(0)),
 				}, nil),
 				template: bytes.Join([][]byte{
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
-					hash(convert.Uint64ToBytes(0)),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
+					Hash(convert.Uint64ToBytes(0)),
 				}, nil),
 				mask: bytes.Join([][]byte{
 					maxIntBytes,
@@ -104,8 +104,8 @@ func TestNewPath(t *testing.T) {
 				}, nil),
 				template: bytes.Join([][]byte{
 					zeroIntBytes,
-					hash([]byte("10.0.0.1")),
-					hash(convert.Uint64ToBytes(0)),
+					Hash([]byte("10.0.0.1")),
+					Hash(convert.Uint64ToBytes(0)),
 				}, nil),
 				mask: bytes.Join([][]byte{
 					zeroIntBytes,
@@ -124,17 +124,17 @@ func TestNewPath(t *testing.T) {
 			},
 			want: Path{
 				prefix: bytes.Join([][]byte{
-					hash([]byte("productpage")),
+					Hash([]byte("productpage")),
 				}, nil),
 				seekKey: bytes.Join([][]byte{
-					hash([]byte("productpage")),
+					Hash([]byte("productpage")),
 					zeroIntBytes,
 					zeroIntBytes,
 				}, nil),
 				template: bytes.Join([][]byte{
-					hash([]byte("productpage")),
+					Hash([]byte("productpage")),
 					zeroIntBytes,
-					hash(convert.Uint64ToBytes(0)),
+					Hash(convert.Uint64ToBytes(0)),
 				}, nil),
 				mask: bytes.Join([][]byte{
 					maxIntBytes,
@@ -153,17 +153,17 @@ func TestNewPath(t *testing.T) {
 			},
 			want: Path{
 				prefix: bytes.Join([][]byte{
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
 				}, nil),
 				seekKey: bytes.Join([][]byte{
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
 					zeroIntBytes,
 				}, nil),
 				template: bytes.Join([][]byte{
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
 					zeroIntBytes,
 				}, nil),
 				mask: bytes.Join([][]byte{
@@ -185,22 +185,22 @@ func TestNewPath(t *testing.T) {
 			want: Path{
 				isFull: true,
 				prefix: bytes.Join([][]byte{
-					hash([]byte("segment")),
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
-					hash(convert.Uint64ToBytes(0)),
+					Hash([]byte("segment")),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
+					Hash(convert.Uint64ToBytes(0)),
 				}, nil),
 				seekKey: bytes.Join([][]byte{
-					hash([]byte("segment")),
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
-					hash(convert.Uint64ToBytes(0)),
+					Hash([]byte("segment")),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
+					Hash(convert.Uint64ToBytes(0)),
 				}, nil),
 				template: bytes.Join([][]byte{
-					hash([]byte("segment")),
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
-					hash(convert.Uint64ToBytes(0)),
+					Hash([]byte("segment")),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
+					Hash(convert.Uint64ToBytes(0)),
 				}, nil),
 				mask: bytes.Join([][]byte{
 					maxIntBytes,
@@ -220,18 +220,18 @@ func TestNewPath(t *testing.T) {
 			},
 			scope: Entry("segment"),
 			want: Path{
-				prefix: hash([]byte("segment")),
+				prefix: Hash([]byte("segment")),
 				seekKey: bytes.Join([][]byte{
-					hash([]byte("segment")),
+					Hash([]byte("segment")),
 					zeroIntBytes,
 					zeroIntBytes,
 					zeroIntBytes,
 				}, nil),
 				template: bytes.Join([][]byte{
-					hash([]byte("segment")),
+					Hash([]byte("segment")),
 					zeroIntBytes,
-					hash([]byte("10.0.0.1")),
-					hash(convert.Uint64ToBytes(0)),
+					Hash([]byte("10.0.0.1")),
+					Hash(convert.Uint64ToBytes(0)),
 				}, nil),
 				mask: bytes.Join([][]byte{
 					maxIntBytes,
@@ -252,20 +252,20 @@ func TestNewPath(t *testing.T) {
 			scope: Entry("segment"),
 			want: Path{
 				prefix: bytes.Join([][]byte{
-					hash([]byte("segment")),
-					hash([]byte("productpage")),
+					Hash([]byte("segment")),
+					Hash([]byte("productpage")),
 				}, nil),
 				seekKey: bytes.Join([][]byte{
-					hash([]byte("segment")),
-					hash([]byte("productpage")),
+					Hash([]byte("segment")),
+					Hash([]byte("productpage")),
 					zeroIntBytes,
 					zeroIntBytes,
 				}, nil),
 				template: bytes.Join([][]byte{
-					hash([]byte("segment")),
-					hash([]byte("productpage")),
+					Hash([]byte("segment")),
+					Hash([]byte("productpage")),
 					zeroIntBytes,
-					hash(convert.Uint64ToBytes(0)),
+					Hash(convert.Uint64ToBytes(0)),
 				}, nil),
 				mask: bytes.Join([][]byte{
 					maxIntBytes,
@@ -286,20 +286,20 @@ func TestNewPath(t *testing.T) {
 			scope: Entry("segment"),
 			want: Path{
 				prefix: bytes.Join([][]byte{
-					hash([]byte("segment")),
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
+					Hash([]byte("segment")),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
 				}, nil),
 				seekKey: bytes.Join([][]byte{
-					hash([]byte("segment")),
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
+					Hash([]byte("segment")),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
 					zeroIntBytes,
 				}, nil),
 				template: bytes.Join([][]byte{
-					hash([]byte("segment")),
-					hash([]byte("productpage")),
-					hash([]byte("10.0.0.1")),
+					Hash([]byte("segment")),
+					Hash([]byte("productpage")),
+					Hash([]byte("10.0.0.1")),
 					zeroIntBytes,
 				}, nil),
 				mask: bytes.Join([][]byte{
@@ -510,7 +510,7 @@ func setUpEntities(t *assert.Assertions, db SeriesDatabase) []*entityWithID {
 		},
 	}
 	for _, d := range data {
-		d.id = common.SeriesID(convert.BytesToUint64(hash(HashEntity(d.entity))))
+		d.id = common.SeriesID(convert.BytesToUint64(Hash(HashEntity(d.entity))))
 		series, err := db.Get(d.entity)
 		t.NoError(err)
 		t.Greater(uint(series.ID()), uint(0))
diff --git a/pkg/bit/writer.go b/pkg/bit/writer.go
index 6302f9e..9995f01 100644
--- a/pkg/bit/writer.go
+++ b/pkg/bit/writer.go
@@ -30,14 +30,18 @@ type Writer struct {
 
 // NewWriter create bit writer
 func NewWriter(buffer *bytes.Buffer) *Writer {
-	var bw Writer
+	bw := new(Writer)
 	bw.Reset(buffer)
-	return &bw
+	return bw
 }
 
 // Reset writes to a new writer
 func (w *Writer) Reset(buffer *bytes.Buffer) {
-	w.out = buffer
+	if buffer == nil {
+		w.out.Reset()
+	} else {
+		w.out = buffer
+	}
 	w.cache = 0
 	w.available = 8
 }
diff --git a/pkg/encoding/int.go b/pkg/encoding/int.go
index b8887ec..79fd69f 100644
--- a/pkg/encoding/int.go
+++ b/pkg/encoding/int.go
@@ -20,6 +20,9 @@ package encoding
 import (
 	"bytes"
 	"encoding/binary"
+	"errors"
+	"io"
+	"sync"
 	"time"
 
 	"github.com/apache/skywalking-banyandb/pkg/bit"
@@ -27,6 +30,74 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 )
 
+var (
+	intEncoderPool = sync.Pool{
+		New: newIntEncoder,
+	}
+	intDecoderPool = sync.Pool{
+		New: func() interface{} {
+			return &intDecoder{}
+		},
+	}
+)
+
+type intEncoderPoolDelegator struct {
+	pool *sync.Pool
+	size int
+	fn   ParseInterval
+}
+
+func NewIntEncoderPool(size int, fn ParseInterval) SeriesEncoderPool {
+	return &intEncoderPoolDelegator{
+		pool: &intEncoderPool,
+		size: size,
+		fn:   fn,
+	}
+}
+
+func (b *intEncoderPoolDelegator) Get(metadata []byte) SeriesEncoder {
+	encoder := b.pool.Get().(*intEncoder)
+	encoder.size = b.size
+	encoder.fn = b.fn
+	encoder.Reset(metadata)
+	return encoder
+}
+
+func (b *intEncoderPoolDelegator) Put(encoder SeriesEncoder) {
+	_, ok := encoder.(*intEncoder)
+	if ok {
+		b.pool.Put(encoder)
+	}
+}
+
+type intDecoderPoolDelegator struct {
+	pool *sync.Pool
+	size int
+	fn   ParseInterval
+}
+
+func NewIntDecoderPool(size int, fn ParseInterval) SeriesDecoderPool {
+	return &intDecoderPoolDelegator{
+		pool: &intDecoderPool,
+		size: size,
+		fn:   fn,
+	}
+}
+
+func (b *intDecoderPoolDelegator) Get(_ []byte) SeriesDecoder {
+	decoder := b.pool.Get().(*intDecoder)
+	decoder.size = b.size
+	decoder.fn = b.fn
+	return decoder
+}
+
+func (b *intDecoderPoolDelegator) Put(decoder SeriesDecoder) {
+	_, ok := decoder.(*intDecoder)
+	if ok {
+		b.pool.Put(decoder)
+	}
+}
+
 var (
 	_ SeriesEncoder = (*intEncoder)(nil)
 )
@@ -40,19 +111,18 @@ type intEncoder struct {
 	fn        ParseInterval
 	interval  time.Duration
 	startTime uint64
+	prevTime  uint64
 	num       int
 	size      int
 }
 
-func NewIntEncoder(size int, fn ParseInterval) SeriesEncoder {
+func newIntEncoder() interface{} {
 	buff := &bytes.Buffer{}
 	bw := bit.NewWriter(buff)
 	return &intEncoder{
 		buff:   buff,
 		bw:     bw,
 		values: NewXOREncoder(bw),
-		fn:     fn,
-		size:   size,
 	}
 }
 
@@ -62,8 +132,9 @@ func (ie *intEncoder) Append(ts uint64, value []byte) {
 	}
 	if ie.startTime == 0 {
 		ie.startTime = ts
+		ie.prevTime = ts
 	}
-	gap := int(ts) - int(ie.startTime)
+	gap := int(ts) - int(ie.prevTime)
 	if gap < 0 {
 		return
 	}
@@ -72,8 +143,9 @@ func (ie *intEncoder) Append(ts uint64, value []byte) {
 		ie.bw.WriteBool(false)
 		ie.num++
 	}
+	ie.prevTime = ts
 	ie.bw.WriteBool(len(value) > 0)
-	ie.values.Write(binary.LittleEndian.Uint64(value))
+	ie.values.Write(convert.BytesToUint64(value))
 	ie.num++
 }
 
@@ -84,6 +156,8 @@ func (ie *intEncoder) IsFull() bool {
 func (ie *intEncoder) Reset(key []byte) {
 	ie.bw.Reset(nil)
 	ie.interval = ie.fn(key)
+	ie.startTime = 0
+	ie.prevTime = 0
 }
 
 func (ie *intEncoder) Encode() ([]byte, error) {
@@ -91,7 +165,7 @@ func (ie *intEncoder) Encode() ([]byte, error) {
 	buffWriter := buffer.NewBufferWriter(ie.buff)
 	buffWriter.PutUint64(ie.startTime)
 	buffWriter.PutUint16(uint16(ie.size))
-	return ie.buff.Bytes(), nil
+	return buffWriter.Bytes(), nil
 }
 
 func (ie *intEncoder) StartTime() uint64 {
@@ -109,14 +183,7 @@ type intDecoder struct {
 	area      []byte
 }
 
-func NewIntDecoder(size int, fn ParseInterval) SeriesDecoder {
-	return &intDecoder{
-		fn:   fn,
-		size: size,
-	}
-}
-
-func (i intDecoder) Decode(key, data []byte) error {
+func (i *intDecoder) Decode(key, data []byte) error {
 	i.interval = i.fn(key)
 	i.startTime = binary.LittleEndian.Uint64(data[len(data)-10 : len(data)-2])
 	i.num = int(binary.LittleEndian.Uint16(data[len(data)-2:]))
@@ -148,14 +215,19 @@ func (i intDecoder) Iterator() SeriesIterator {
 		interval:  int(i.interval),
 		br:        br,
 		values:    NewXORDecoder(br),
+		size:      i.size,
 	}
 }
 
-var _ SeriesIterator = (*intIterator)(nil)
+var (
+	_    SeriesIterator = (*intIterator)(nil)
+	zero                = convert.BytesToUint64(convert.Int64ToBytes(0))
+)
 
 type intIterator struct {
 	startTime uint64
 	interval  int
+	size      int
 	br        *bit.Reader
 	values    *XORDecoder
 
@@ -166,17 +238,24 @@ type intIterator struct {
 }
 
 func (i *intIterator) Next() bool {
-	var b bool
-	b, i.err = i.br.ReadBool()
-	if i.err != nil {
+	if i.index >= i.size {
+		return false
+	}
+	b, err := i.br.ReadBool()
+	if errors.Is(err, io.EOF) {
+		return false
+	}
+	if err != nil {
+		i.err = err
 		return false
 	}
 	if b {
 		if i.values.Next() {
 			i.currVal = i.values.Value()
 		}
+	} else {
+		i.currVal = zero
 	}
-	i.currVal = 0
 	i.currTime = i.startTime + uint64(i.interval*i.index)
 	i.index++
 	return true
diff --git a/pkg/encoding/int_test.go b/pkg/encoding/int_test.go
new file mode 100644
index 0000000..3c686d6
--- /dev/null
+++ b/pkg/encoding/int_test.go
@@ -0,0 +1,123 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+func TestNewIntEncoderAndDecoder(t *testing.T) {
+	type tsData struct {
+		ts   []uint64
+		data []int64
+	}
+	tests := []struct {
+		name string
+		args tsData
+		want tsData
+	}{
+		{
+			name: "golden path",
+			args: tsData{
+				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+				data: []int64{7, 8, 7, 9},
+			},
+			want: tsData{
+				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+				data: []int64{7, 8, 7, 9},
+			},
+		},
+		{
+			name: "more than the size",
+			args: tsData{
+				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(4 * time.Minute)},
+				data: []int64{7, 8, 7, 9, 6},
+			},
+			want: tsData{
+				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+				data: []int64{7, 8, 7, 9},
+			},
+		},
+		{
+			name: "less than the size",
+			args: tsData{
+				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
+				data: []int64{7, 8, 7},
+			},
+			want: tsData{
+				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)},
+				data: []int64{7, 8, 7},
+			},
+		},
+		{
+			name: "empty slot in the middle",
+			args: tsData{
+				ts:   []uint64{uint64(time.Minute), uint64(4 * time.Minute)},
+				data: []int64{7, 9},
+			},
+			want: tsData{
+				ts:   []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)},
+				data: []int64{7, 0, 0, 9},
+			},
+		},
+	}
+	key := []byte("foo")
+	fn := func(k []byte) time.Duration {
+		assert.Equal(t, key, k)
+		return 1 * time.Minute
+	}
+	encoderPool := NewIntEncoderPool(3, fn)
+	decoderPool := NewIntDecoderPool(3, fn)
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			at := assert.New(t)
+			encoder := encoderPool.Get(key)
+			decoder := decoderPool.Get(key)
+			encoder.Reset(key)
+			for i, v := range tt.args.ts {
+				encoder.Append(v, convert.Int64ToBytes(tt.args.data[i]))
+				if encoder.IsFull() {
+					break
+				}
+			}
+			bb, err := encoder.Encode()
+			at.NoError(err)
+			at.NoError(decoder.Decode(key, bb))
+			at.True(decoder.IsFull())
+			iter := decoder.Iterator()
+			i := 0
+			for ; iter.Next(); i++ {
+				at.NoError(iter.Error())
+				at.Equal(tt.want.ts[i], iter.Time())
+				at.Equal(tt.want.data[i], convert.BytesToInt64(iter.Val()))
+				v, err := decoder.Get(tt.want.ts[i])
+				at.NoError(err)
+				at.Equal(tt.want.data[i], convert.BytesToInt64(v))
+			}
+			if i == 0 {
+				at.Fail("empty data")
+			}
+		})
+	}
+}
diff --git a/pkg/encoding/plain.go b/pkg/encoding/plain.go
index e511441..626e3e9 100644
--- a/pkg/encoding/plain.go
+++ b/pkg/encoding/plain.go
@@ -31,59 +31,65 @@ import (
 )
 
 var (
-	encoderPool = sync.Pool{
+	plainEncoderPool = sync.Pool{
 		New: newPlainEncoder,
 	}
-	decoderPool = sync.Pool{
+	plainDecoderPool = sync.Pool{
 		New: func() interface{} {
 			return &plainDecoder{}
 		},
 	}
 )
 
-type plainEncoderPool struct {
+type plainEncoderPoolDelegator struct {
 	pool *sync.Pool
 	size int
 }
 
 func NewPlainEncoderPool(size int) SeriesEncoderPool {
-	return &plainEncoderPool{
-		pool: &encoderPool,
+	return &plainEncoderPoolDelegator{
+		pool: &plainEncoderPool,
 		size: size,
 	}
 }
 
-func (b *plainEncoderPool) Get(metadata []byte) SeriesEncoder {
+func (b *plainEncoderPoolDelegator) Get(metadata []byte) SeriesEncoder {
 	encoder := b.pool.Get().(*plainEncoder)
 	encoder.Reset(metadata)
 	encoder.valueSize = b.size
 	return encoder
 }
 
-func (b *plainEncoderPool) Put(encoder SeriesEncoder) {
-	b.pool.Put(encoder)
+func (b *plainEncoderPoolDelegator) Put(encoder SeriesEncoder) {
+	_, ok := encoder.(*plainEncoder)
+	if ok {
+		b.pool.Put(encoder)
+	}
 }
 
-type plainDecoderPool struct {
+type plainDecoderPoolDelegator struct {
 	pool *sync.Pool
 	size int
 }
 
 func NewPlainDecoderPool(size int) SeriesDecoderPool {
-	return &plainDecoderPool{
-		pool: &decoderPool,
+	return &plainDecoderPoolDelegator{
+		pool: &plainDecoderPool,
 		size: size,
 	}
 }
 
-func (b *plainDecoderPool) Get(_ []byte) SeriesDecoder {
+func (b *plainDecoderPoolDelegator) Get(_ []byte) SeriesDecoder {
 	decoder := b.pool.Get().(*plainDecoder)
 	decoder.valueSize = b.size
 	return decoder
 }
 
-func (b *plainDecoderPool) Put(decoder SeriesDecoder) {
-	b.pool.Put(decoder)
+func (b *plainDecoderPoolDelegator) Put(decoder SeriesDecoder) {
+	_, ok := decoder.(*plainDecoder)
+	if ok {
+		b.pool.Put(decoder)
+	}
 }
 
 var (
diff --git a/pkg/test/measure/testdata/measures/service_cpm_day.json b/pkg/test/measure/testdata/measures/service_cpm_day.json
index 5e8bd75..0a4190d 100644
--- a/pkg/test/measure/testdata/measures/service_cpm_day.json
+++ b/pkg/test/measure/testdata/measures/service_cpm_day.json
@@ -37,6 +37,6 @@
       "entity_id"
     ]
   },
-  "interval": "1d",
+  "interval": "24h",
   "updated_at": "2021-04-15T01:30:15.01Z"
 }
\ No newline at end of file
diff --git a/pkg/test/measure/testdata/measures/service_instance_cpm_day.json b/pkg/test/measure/testdata/measures/service_instance_cpm_day.json
index 9476244..36d17ce 100644
--- a/pkg/test/measure/testdata/measures/service_instance_cpm_day.json
+++ b/pkg/test/measure/testdata/measures/service_instance_cpm_day.json
@@ -42,6 +42,6 @@
       "entity_id"
     ]
   },
-  "interval": "1d",
+  "interval": "24h",
   "updated_at": "2021-04-15T01:30:15.01Z"
 }
\ No newline at end of file
diff --git a/pkg/timestamp/duration.go b/pkg/timestamp/duration.go
new file mode 100644
index 0000000..fd7fdbb
--- /dev/null
+++ b/pkg/timestamp/duration.go
@@ -0,0 +1,53 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package timestamp
+
+import (
+	"strconv"
+	"strings"
+	"time"
+)
+
+// ParseDuration parses a duration string.
+// A duration string is a possibly signed sequence of
+// decimal numbers, each with optional fraction and a unit suffix,
+// such as "300ms", "-1.5h" or "2h45m".
+// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h", "d".
+func ParseDuration(s string) (time.Duration, error) {
+	i := strings.Index(s, "d")
+	if i <= 0 {
+		return time.ParseDuration(s)
+	}
+	neg := false
+	if s != "" {
+		c := s[0]
+		if c == '-' || c == '+' {
+			neg = c == '-'
+			s = s[1:]
+			i--
+		}
+	}
+	d, err := strconv.Atoi(s[:i])
+	if neg {
+		d = -d
+	}
+	if err != nil {
+		return 0, err
+	}
+	return time.Hour * 24 * time.Duration(d), nil
+}
diff --git a/pkg/timestamp/duration_test.go b/pkg/timestamp/duration_test.go
new file mode 100644
index 0000000..1ce1d1b
--- /dev/null
+++ b/pkg/timestamp/duration_test.go
@@ -0,0 +1,60 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package timestamp
+
+import (
+	"testing"
+	"time"
+)
+
+func TestParseDuration(t *testing.T) {
+	tests := []struct {
+		name    string
+		arg     string
+		want    time.Duration
+		wantErr bool
+	}{
+		{
+			name: "one day",
+			arg:  "1d",
+			want: time.Hour * 24,
+		},
+		{
+			name: "negative one day",
+			arg:  "-1d",
+			want: -time.Hour * 24,
+		},
+		{
+			name: "5 hours",
+			arg:  "5h",
+			want: time.Hour * 5,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got, err := ParseDuration(tt.arg)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("ParseDuration() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if got != tt.want {
+				t.Errorf("ParseDuration() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}