You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/11/20 02:30:08 UTC

[skywalking-banyandb] 01/02: Introduce gorilla encoding for integer values

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch encoding
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 08a9b4e815e655de3cef9172bc29f75d2b5ea6a0
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sat Nov 20 08:31:18 2021 +0800

    Introduce gorilla encoding for integer values
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/kv/badger.go                       |  31 +++++
 banyand/kv/kv.go                           |  16 ++-
 banyand/stream/stream.go                   |   8 +-
 banyand/tsdb/block.go                      |   2 +-
 banyand/tsdb/tsdb.go                       |   6 +-
 banyand/tsdb/tsdb_test.go                  |   8 +-
 go.mod                                     |   2 +-
 go.sum                                     |   4 +-
 pkg/bit/reader.go                          | 104 +++++++++++++++
 pkg/{bytes/bytes.go => bit/reader_test.go} |  41 ++++--
 pkg/bit/writer.go                          |  92 ++++++++++++++
 pkg/{bytes/bytes.go => bit/writer_test.go} |  50 ++++++--
 pkg/buffer/writer.go                       |  72 +++++++++++
 pkg/encoding/encoding.go                   |  14 ++-
 pkg/encoding/int.go                        | 195 +++++++++++++++++++++++++++++
 pkg/encoding/{stream_chunk.go => plain.go} | 175 +++++++++++++++-----------
 pkg/encoding/xor.go                        | 177 ++++++++++++++++++++++++++
 pkg/encoding/xor_test.go                   |  60 +++++++++
 18 files changed, 932 insertions(+), 125 deletions(-)

diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 24820f0..728a298 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -259,6 +259,37 @@ func (l *badgerLog) Debugf(f string, v ...interface{}) {
 	l.delegated.Debug().Msgf(f, v...)
 }
 
+var _ bydb.TSetEncoderPool = (*encoderPoolDelegate)(nil)
+
+type encoderPoolDelegate struct {
+	encoding.SeriesEncoderPool
+}
+
+func (e *encoderPoolDelegate) Get(metadata []byte) bydb.TSetEncoder {
+	return e.SeriesEncoderPool.Get(metadata)
+}
+
+func (e *encoderPoolDelegate) Put(encoder bydb.TSetEncoder) {
+	e.SeriesEncoderPool.Put(encoder)
+}
+
+var _ bydb.TSetDecoderPool = (*decoderPoolDelegate)(nil)
+
+type decoderPoolDelegate struct {
+	encoding.SeriesDecoderPool
+}
+
+func (e *decoderPoolDelegate) Get(metadata []byte) bydb.TSetDecoder {
+	return &decoderDelegate{
+		e.SeriesDecoderPool.Get(metadata),
+	}
+}
+
+func (e *decoderPoolDelegate) Put(decoder bydb.TSetDecoder) {
+	dd := decoder.(*decoderDelegate)
+	e.SeriesDecoderPool.Put(dd.SeriesDecoder)
+}
+
 var _ bydb.TSetDecoder = (*decoderDelegate)(nil)
 
 type decoderDelegate struct {
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index ea92d1c..2def4dc 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -23,7 +23,6 @@ import (
 	"math"
 
 	"github.com/dgraph-io/badger/v3"
-	"github.com/dgraph-io/badger/v3/bydb"
 	"github.com/pkg/errors"
 
 	"github.com/apache/skywalking-banyandb/pkg/encoding"
@@ -103,16 +102,15 @@ func TSSWithLogger(l *logger.Logger) TimeSeriesOptions {
 	}
 }
 
-func TSSWithEncoding(encoderFactory encoding.SeriesEncoderFactory, decoderFactory encoding.SeriesDecoderFactory) TimeSeriesOptions {
+func TSSWithEncoding(encoderPool encoding.SeriesEncoderPool, decoderPool encoding.SeriesDecoderPool) TimeSeriesOptions {
 	return func(store TimeSeriesStore) {
 		if btss, ok := store.(*badgerTSS); ok {
-			btss.dbOpts = btss.dbOpts.WithExternalCompactor(func() bydb.TSetEncoder {
-				return encoderFactory()
-			}, func() bydb.TSetDecoder {
-				return &decoderDelegate{
-					SeriesDecoder: decoderFactory(),
-				}
-			})
+			btss.dbOpts = btss.dbOpts.WithExternalCompactor(
+				&encoderPoolDelegate{
+					encoderPool,
+				}, &decoderPoolDelegate{
+					decoderPool,
+				})
 		}
 	}
 }
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 760687d..444a483 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -107,12 +107,8 @@ func openStream(root string, spec streamSpec, l *logger.Logger) (*stream, error)
 			ShardNum:   sm.schema.GetOpts().GetShardNum(),
 			IndexRules: spec.indexRules,
 			EncodingMethod: tsdb.EncodingMethod{
-				EncoderFactory: func() encoding.SeriesEncoder {
-					return encoding.NewStreamChunkEncoder(chunkSize)
-				},
-				DecoderFactory: func() encoding.SeriesDecoder {
-					return encoding.NewStreamChunkDecoder(chunkSize)
-				},
+				EncoderPool: encoding.NewPlainEncoderPool(chunkSize),
+				DecoderPool: encoding.NewPlainDecoderPool(chunkSize),
 			},
 		})
 	if err != nil {
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 2f38308..956bf13 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -78,7 +78,7 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 	if b.store, err = kv.OpenTimeSeriesStore(
 		0,
 		b.path+"/store",
-		kv.TSSWithEncoding(encodingMethod.EncoderFactory, encodingMethod.DecoderFactory),
+		kv.TSSWithEncoding(encodingMethod.EncoderPool, encodingMethod.DecoderPool),
 		kv.TSSWithLogger(b.l),
 	); err != nil {
 		return nil, err
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 8df233b..e9ed19a 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -82,8 +82,8 @@ type DatabaseOpts struct {
 }
 
 type EncodingMethod struct {
-	EncoderFactory encoding.SeriesEncoderFactory
-	DecoderFactory encoding.SeriesDecoderFactory
+	EncoderPool encoding.SeriesEncoderPool
+	DecoderPool encoding.SeriesDecoderPool
 }
 
 type database struct {
@@ -124,7 +124,7 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
 			db.logger = pl.Named("tsdb")
 		}
 	}
-	if opts.EncodingMethod.EncoderFactory == nil || opts.EncodingMethod.DecoderFactory == nil {
+	if opts.EncodingMethod.EncoderPool == nil || opts.EncodingMethod.DecoderPool == nil {
 		return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to open database")
 	}
 	if _, err := mkdir(opts.Location); err != nil {
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index 89c56d6..c570fdf 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -58,12 +58,8 @@ func setUp(t *require.Assertions) (tempDir string, deferFunc func(), db Database
 			Location: tempDir,
 			ShardNum: 1,
 			EncodingMethod: EncodingMethod{
-				EncoderFactory: func() encoding.SeriesEncoder {
-					return nil
-				},
-				DecoderFactory: func() encoding.SeriesDecoder {
-					return nil
-				},
+				EncoderPool: encoding.NewPlainEncoderPool(0),
+				DecoderPool: encoding.NewPlainDecoderPool(0),
 			},
 		})
 	t.NoError(err)
diff --git a/go.mod b/go.mod
index 24e4ce0..b2815cb 100644
--- a/go.mod
+++ b/go.mod
@@ -101,4 +101,4 @@ require (
 	sigs.k8s.io/yaml v1.2.0 // indirect
 )
 
-replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476
+replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a
diff --git a/go.sum b/go.sum
index 957109b..afb3bff 100644
--- a/go.sum
+++ b/go.sum
@@ -47,8 +47,8 @@ github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eW
 github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE=
 github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM=
 github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
-github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476 h1:MH/Jy2x3WF3RdD+WD25XepG4fzIz3qMOoIUM4Enn+GA=
-github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M=
+github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a h1:kcUQmdVI0E0J8bfwJpbQhWOOxijKNeoEfLsiIkayf1E=
+github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
diff --git a/pkg/bit/reader.go b/pkg/bit/reader.go
new file mode 100644
index 0000000..22cb929
--- /dev/null
+++ b/pkg/bit/reader.go
@@ -0,0 +1,104 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bit
+
+import (
+	"io"
+)
+
+// Reader reads bits from buffer
+type Reader struct {
+	in    io.ByteReader
+	cache byte
+	len   byte
+}
+
+// NewReader crate bit reader
+func NewReader(in io.ByteReader) *Reader {
+	return &Reader{
+		in: in,
+	}
+}
+
+// ReadBool reads a bit, 1 returns true, 0 returns false
+func (r *Reader) ReadBool() (bool, error) {
+	if r.len == 0 {
+		b, err := r.in.ReadByte()
+		if err != nil {
+			return false, err
+		}
+		r.cache = b
+		r.len = 8
+	}
+	r.len--
+	b := r.cache & 0x80
+	r.cache <<= 1
+	return b != 0, nil
+}
+
+// ReadBits read number of bits
+func (r *Reader) ReadBits(numBits int) (uint64, error) {
+	var result uint64
+
+	for ; numBits >= 8; numBits -= 8 {
+		b, err := r.ReadByte()
+		if err != nil {
+			return 0, err
+		}
+
+		result = (result << 8) | uint64(b)
+	}
+
+	for ; numBits > 0; numBits-- {
+		byt, err := r.ReadBool()
+		if err != nil {
+			return 0, err
+		}
+		result <<= 1
+		if byt {
+			result |= 1
+		}
+	}
+
+	return result, nil
+}
+
+// ReadByte reads a byte
+func (r *Reader) ReadByte() (byte, error) {
+	if r.len == 0 {
+		b, err := r.in.ReadByte()
+		if err != nil {
+			return b, err
+		}
+		r.cache = b
+		return b, err
+	}
+	b, err := r.in.ReadByte()
+	if err != nil {
+		return b, err
+	}
+	result := r.cache | b>>r.len
+	r.cache = b << (8 - r.len)
+	return result, nil
+}
+
+// Reset resets the reader to read from a new slice
+func (r *Reader) Reset() {
+	r.len = 0
+	r.cache = 0
+}
diff --git a/pkg/bytes/bytes.go b/pkg/bit/reader_test.go
similarity index 56%
copy from pkg/bytes/bytes.go
copy to pkg/bit/reader_test.go
index b4d051c..6f30bbe 100644
--- a/pkg/bytes/bytes.go
+++ b/pkg/bit/reader_test.go
@@ -15,17 +15,38 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package bytes
+package bit
 
-func Join(s ...[]byte) []byte {
-	n := 0
-	for _, v := range s {
-		n += len(v)
-	}
+import (
+	"bytes"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestReader(t *testing.T) {
+	data := []byte{3, 255, 0xcc, 0x1a, 0xbc, 0xde, 0x80}
+
+	r := NewReader(bytes.NewBuffer(data))
+	a := assert.New(t)
+
+	eq(a, byte(3))(r.ReadByte())
+	eq(a, uint64(255))(r.ReadBits(8))
+
+	eq(a, uint64(0xc))(r.ReadBits(4))
+
+	eq(a, uint64(0xc1))(r.ReadBits(8))
+
+	eq(a, uint64(0xabcde))(r.ReadBits(20))
+
+	eq(a, true)(r.ReadBool())
+	eq(a, false)(r.ReadBool())
+
+}
 
-	b, i := make([]byte, n), 0
-	for _, v := range s {
-		i += copy(b[i:], v)
+func eq(a *assert.Assertions, expected interface{}) func(interface{}, error) {
+	return func(actual interface{}, err error) {
+		a.NoError(err)
+		a.Equal(expected, actual)
 	}
-	return b
 }
diff --git a/pkg/bit/writer.go b/pkg/bit/writer.go
new file mode 100644
index 0000000..6302f9e
--- /dev/null
+++ b/pkg/bit/writer.go
@@ -0,0 +1,92 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bit
+
+import (
+	"bytes"
+)
+
+// Writer writes bits to an io.BufferWriter
+type Writer struct {
+	out       *bytes.Buffer
+	cache     byte
+	available byte
+}
+
+// NewWriter create bit writer
+func NewWriter(buffer *bytes.Buffer) *Writer {
+	var bw Writer
+	bw.Reset(buffer)
+	return &bw
+}
+
+// Reset writes to a new writer
+func (w *Writer) Reset(buffer *bytes.Buffer) {
+	w.out = buffer
+	w.cache = 0
+	w.available = 8
+}
+
+// WriteBool writes a boolean value
+// true: 1
+// false: 0
+func (w *Writer) WriteBool(b bool) {
+	if b {
+		w.cache |= 1 << (w.available - 1)
+	}
+
+	w.available--
+
+	if w.available == 0 {
+		// WriteByte never returns error
+		_ = w.out.WriteByte(w.cache)
+		w.cache = 0
+		w.available = 8
+	}
+}
+
+// WriteBits writes number of bits
+func (w *Writer) WriteBits(u uint64, numBits int) {
+	u <<= 64 - uint(numBits)
+
+	for ; numBits >= 8; numBits -= 8 {
+		byt := byte(u >> 56)
+		w.WriteByte(byt)
+		u <<= 8
+	}
+
+	remainder := byte(u >> 56)
+	for ; numBits > 0; numBits-- {
+		w.WriteBool((remainder & 0x80) != 0)
+		remainder <<= 1
+	}
+}
+
+// WriteByte write a byte
+func (w *Writer) WriteByte(b byte) {
+	_ = w.out.WriteByte(w.cache | (b >> (8 - w.available)))
+	w.cache = b << w.available
+}
+
+// Flush flushes the currently in-process byte
+func (w *Writer) Flush() {
+	if w.available != 8 {
+		_ = w.out.WriteByte(w.cache)
+	}
+	w.Reset(w.out)
+}
diff --git a/pkg/bytes/bytes.go b/pkg/bit/writer_test.go
similarity index 57%
rename from pkg/bytes/bytes.go
rename to pkg/bit/writer_test.go
index b4d051c..7624ff8 100644
--- a/pkg/bytes/bytes.go
+++ b/pkg/bit/writer_test.go
@@ -15,17 +15,41 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package bytes
-
-func Join(s ...[]byte) []byte {
-	n := 0
-	for _, v := range s {
-		n += len(v)
-	}
-
-	b, i := make([]byte, n), 0
-	for _, v := range s {
-		i += copy(b[i:], v)
-	}
-	return b
+package bit
+
+import (
+	"bytes"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestWriter(t *testing.T) {
+	out := &bytes.Buffer{}
+	w := NewWriter(out)
+
+	a := assert.New(t)
+
+	w.WriteByte(0xc1)
+	w.WriteBool(false)
+	w.WriteBits(0x3f, 6)
+	w.WriteBool(true)
+	w.WriteByte(0xac)
+	w.WriteBits(0x01, 1)
+	w.WriteBits(0x1248f, 20)
+	w.Flush()
+
+	w.WriteByte(0x01)
+	w.WriteByte(0x02)
+
+	w.WriteBits(0x0f, 4)
+
+	w.WriteByte(0x80)
+	w.WriteByte(0x8f)
+	w.Flush()
+
+	w.WriteBits(0x01, 1)
+	w.WriteByte(0xff)
+	w.Flush()
+	a.Equal([]byte{0xc1, 0x7f, 0xac, 0x89, 0x24, 0x78, 0x01, 0x02, 0xf8, 0x08, 0xf0, 0xff, 0x80}, out.Bytes())
 }
diff --git a/pkg/buffer/writer.go b/pkg/buffer/writer.go
new file mode 100644
index 0000000..e109d25
--- /dev/null
+++ b/pkg/buffer/writer.go
@@ -0,0 +1,72 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package buffer
+
+import (
+	"bytes"
+	"encoding/binary"
+)
+
+// Writer writes data into a buffer
+type Writer struct {
+	buf *bytes.Buffer
+
+	scratch [binary.MaxVarintLen64]byte
+}
+
+func NewBufferWriter(buf *bytes.Buffer) *Writer {
+	return &Writer{
+		buf: buf,
+	}
+}
+
+func (w *Writer) Write(p []byte) {
+	_, _ = w.buf.Write(p)
+}
+
+func (w *Writer) WriteTo(other *Writer) (n int64) {
+	n, _ = w.buf.WriteTo(other.buf)
+	return n
+}
+
+func (w *Writer) PutUint16(v uint16) {
+	binary.LittleEndian.PutUint16(w.scratch[:], v)
+	_, _ = w.buf.Write(w.scratch[:2])
+}
+
+func (w *Writer) PutUint32(v uint32) {
+	binary.LittleEndian.PutUint32(w.scratch[:], v)
+	_, _ = w.buf.Write(w.scratch[:4])
+}
+
+func (w *Writer) PutUint64(v uint64) {
+	binary.LittleEndian.PutUint64(w.scratch[:], v)
+	_, _ = w.buf.Write(w.scratch[:8])
+}
+
+func (w *Writer) Reset() {
+	w.buf.Reset()
+}
+
+func (w *Writer) Len() int {
+	return w.buf.Len()
+}
+
+func (w *Writer) Bytes() []byte {
+	return w.buf.Bytes()
+}
diff --git a/pkg/encoding/encoding.go b/pkg/encoding/encoding.go
index 5e04e72..05a1f14 100644
--- a/pkg/encoding/encoding.go
+++ b/pkg/encoding/encoding.go
@@ -21,7 +21,10 @@ import "github.com/pkg/errors"
 
 var ErrEncodeEmpty = errors.New("encode an empty value")
 
-type SeriesEncoderFactory func() SeriesEncoder
+type SeriesEncoderPool interface {
+	Get(metadata []byte) SeriesEncoder
+	Put(encoder SeriesEncoder)
+}
 
 // SeriesEncoder encodes time series data point
 type SeriesEncoder interface {
@@ -30,19 +33,22 @@ type SeriesEncoder interface {
 	// IsFull returns whether the encoded data reached its capacity
 	IsFull() bool
 	// Reset the underlying buffer
-	Reset()
+	Reset(key []byte)
 	// Encode the time series data point to a binary
 	Encode() ([]byte, error)
 	// StartTime indicates the first entry's time
 	StartTime() uint64
 }
 
-type SeriesDecoderFactory func() SeriesDecoder
+type SeriesDecoderPool interface {
+	Get(metadata []byte) SeriesDecoder
+	Put(encoder SeriesDecoder)
+}
 
 // SeriesDecoder decodes encoded time series data
 type SeriesDecoder interface {
 	// Decode the time series data
-	Decode(data []byte) error
+	Decode(key, data []byte) error
 	// Len denotes the size of iterator
 	Len() int
 	// IsFull returns whether the encoded data reached its capacity
diff --git a/pkg/encoding/int.go b/pkg/encoding/int.go
new file mode 100644
index 0000000..9571ec4
--- /dev/null
+++ b/pkg/encoding/int.go
@@ -0,0 +1,195 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+	"bytes"
+	"encoding/binary"
+	"time"
+
+	"github.com/apache/skywalking-banyandb/pkg/bit"
+	"github.com/apache/skywalking-banyandb/pkg/buffer"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+var (
+	_ SeriesEncoder = (*intEncoder)(nil)
+)
+
+type ParseInterval = func(key []byte) time.Duration
+
+type intEncoder struct {
+	buff      *bytes.Buffer
+	bw        *bit.Writer
+	values    *XOREncoder
+	fn        ParseInterval
+	interval  time.Duration
+	startTime uint64
+	num       int
+	size      int
+}
+
+func NewIntEncoder(size int, fn ParseInterval) SeriesEncoder {
+	buff := &bytes.Buffer{}
+	bw := bit.NewWriter(buff)
+	return &intEncoder{
+		buff:   buff,
+		bw:     bw,
+		values: NewXOREncoder(bw),
+		fn:     fn,
+		size:   size,
+	}
+}
+
+func (ie *intEncoder) Append(ts uint64, value []byte) {
+	if len(value) > 8 {
+		return
+	}
+	if ie.startTime == 0 {
+		ie.startTime = ts
+	}
+	gap := int(ts) - int(ie.startTime)
+	if gap < 0 {
+		return
+	}
+	zeroNum := gap/int(ie.interval) - 1
+	for i := 0; i < zeroNum; i++ {
+		ie.bw.WriteBool(false)
+		ie.num++
+	}
+	ie.bw.WriteBool(true)
+	ie.values.Write(binary.LittleEndian.Uint64(value))
+	ie.num++
+}
+
+func (ie *intEncoder) IsFull() bool {
+	return ie.num >= ie.size
+}
+
+func (ie *intEncoder) Reset(key []byte) {
+	ie.bw.Reset(nil)
+	ie.interval = ie.fn(key)
+}
+
+func (ie *intEncoder) Encode() ([]byte, error) {
+	ie.bw.Flush()
+	buffWriter := buffer.NewBufferWriter(ie.buff)
+	buffWriter.PutUint64(ie.startTime)
+	buffWriter.PutUint16(uint16(ie.size))
+	return ie.buff.Bytes(), nil
+}
+
+func (ie *intEncoder) StartTime() uint64 {
+	return ie.startTime
+}
+
+var _ SeriesDecoder = (*intDecoder)(nil)
+
+type intDecoder struct {
+	fn        ParseInterval
+	size      int
+	interval  time.Duration
+	startTime uint64
+	num       int
+	area      []byte
+}
+
+func NewIntDecoder(size int, fn ParseInterval) SeriesDecoder {
+	return &intDecoder{
+		fn:   fn,
+		size: size,
+	}
+}
+
+func (i intDecoder) Decode(key, data []byte) error {
+	i.interval = i.fn(key)
+	i.startTime = binary.LittleEndian.Uint64(data[len(data)-10 : len(data)-2])
+	i.num = int(binary.LittleEndian.Uint16(data[len(data)-2:]))
+	i.area = data[:len(data)-10]
+	return nil
+}
+
+func (i intDecoder) Len() int {
+	return i.num
+}
+
+func (i intDecoder) IsFull() bool {
+	return i.num >= i.size
+}
+
+func (i intDecoder) Get(ts uint64) ([]byte, error) {
+	for iter := i.Iterator(); iter.Next(); {
+		if iter.Time() == ts {
+			return iter.Val(), nil
+		}
+	}
+	return nil, nil
+}
+
+func (i intDecoder) Iterator() SeriesIterator {
+	br := bit.NewReader(bytes.NewReader(i.area))
+	return &intIterator{
+		startTime: i.startTime,
+		interval:  int(i.interval),
+		br:        br,
+		values:    NewXORDecoder(br),
+	}
+}
+
+var _ SeriesIterator = (*intIterator)(nil)
+
+type intIterator struct {
+	startTime uint64
+	interval  int
+	br        *bit.Reader
+	values    *XORDecoder
+
+	currVal  uint64
+	currTime uint64
+	index    int
+	err      error
+}
+
+func (i *intIterator) Next() bool {
+	var b bool
+	b, i.err = i.br.ReadBool()
+	if i.err != nil {
+		return false
+	}
+	if b {
+		if i.values.Next() {
+			i.currVal = i.values.Value()
+		}
+	}
+	i.currVal = 0
+	i.currTime = i.startTime + uint64(i.interval*i.index)
+	i.index++
+	return true
+}
+
+func (i *intIterator) Val() []byte {
+	return convert.Uint64ToBytes(i.currVal)
+}
+
+func (i *intIterator) Time() uint64 {
+	return i.currTime
+}
+
+func (i *intIterator) Error() error {
+	return i.err
+}
diff --git a/pkg/encoding/stream_chunk.go b/pkg/encoding/plain.go
similarity index 54%
rename from pkg/encoding/stream_chunk.go
rename to pkg/encoding/plain.go
index 7ff5094..0b3fb8c 100644
--- a/pkg/encoding/stream_chunk.go
+++ b/pkg/encoding/plain.go
@@ -22,106 +22,147 @@ import (
 	"encoding/binary"
 	"fmt"
 	"sort"
+	"sync"
 
 	"github.com/klauspost/compress/zstd"
 	"github.com/pkg/errors"
+
+	"github.com/apache/skywalking-banyandb/pkg/buffer"
 )
 
 var (
-	decoder, _               = zstd.NewReader(nil)
-	encoder, _               = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
-	_          SeriesEncoder = (*streamChunkEncoder)(nil)
-	_          SeriesDecoder = (*StreamChunkDecoder)(nil)
+	encoderPool = sync.Pool{
+		New: newPlainEncoder,
+	}
+	decoderPool = sync.Pool{
+		New: func() interface{} {
+			return &plainDecoder{}
+		},
+	}
+)
+
+type plainEncoderPool struct {
+	pool *sync.Pool
+	size int
+}
+
+func NewPlainEncoderPool(size int) SeriesEncoderPool {
+	return &plainEncoderPool{
+		pool: &encoderPool,
+		size: size,
+	}
+}
+
+func (b *plainEncoderPool) Get(metadata []byte) SeriesEncoder {
+	encoder := b.pool.Get().(*plainEncoder)
+	encoder.Reset(metadata)
+	encoder.valueSize = b.size
+	return encoder
+}
+
+func (b *plainEncoderPool) Put(encoder SeriesEncoder) {
+	b.pool.Put(encoder)
+}
+
+type plainDecoderPool struct {
+	pool *sync.Pool
+	size int
+}
+
+func NewPlainDecoderPool(size int) SeriesDecoderPool {
+	return &plainDecoderPool{
+		pool: &decoderPool,
+		size: size,
+	}
+}
+
+func (b *plainDecoderPool) Get(_ []byte) SeriesDecoder {
+	decoder := b.pool.Get().(*plainDecoder)
+	decoder.valueSize = b.size
+	return decoder
+}
+
+func (b *plainDecoderPool) Put(decoder SeriesDecoder) {
+	b.pool.Put(decoder)
+}
+
+var (
+	zstdDecoder, _               = zstd.NewReader(nil)
+	zstdEncoder, _               = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
+	_              SeriesEncoder = (*plainEncoder)(nil)
+	_              SeriesDecoder = (*plainDecoder)(nil)
 )
 
-//streamChunkEncoder backport to reduced value
-type streamChunkEncoder struct {
-	tsBuff    bytes.Buffer
-	valBuff   bytes.Buffer
-	scratch   [binary.MaxVarintLen64]byte
+//plainEncoder backport to reduced value
+type plainEncoder struct {
+	tsBuff    *buffer.Writer
+	valBuff   *buffer.Writer
 	len       uint32
 	num       uint32
 	startTime uint64
 	valueSize int
 }
 
-func NewStreamChunkEncoder(size int) SeriesEncoder {
-	return &streamChunkEncoder{
-		valueSize: size,
+func newPlainEncoder() interface{} {
+	return &plainEncoder{
+		tsBuff:  buffer.NewBufferWriter(&bytes.Buffer{}),
+		valBuff: buffer.NewBufferWriter(&bytes.Buffer{}),
 	}
 }
 
-func (t *streamChunkEncoder) Append(ts uint64, value []byte) {
+func (t *plainEncoder) Append(ts uint64, value []byte) {
 	if t.startTime == 0 {
 		t.startTime = ts
 	} else if t.startTime > ts {
 		t.startTime = ts
 	}
 	vLen := len(value)
-	offset := uint32(len(t.valBuff.Bytes()))
-	t.valBuff.Write(t.putUint32(uint32(vLen)))
+	offset := uint32(t.valBuff.Len())
+	t.valBuff.PutUint32(uint32(vLen))
 	t.valBuff.Write(value)
-	t.tsBuff.Write(t.putUint64(ts))
-	t.tsBuff.Write(t.putUint32(offset))
-	t.num = t.num + 1
+	t.tsBuff.PutUint64(ts)
+	t.tsBuff.PutUint32(offset)
+	t.num++
 }
 
-func (t *streamChunkEncoder) IsFull() bool {
+func (t *plainEncoder) IsFull() bool {
 	return t.valBuff.Len() >= t.valueSize
 }
 
-func (t *streamChunkEncoder) Reset() {
+func (t *plainEncoder) Reset(_ []byte) {
 	t.tsBuff.Reset()
 	t.valBuff.Reset()
 	t.num = 0
 	t.startTime = 0
 }
 
-func (t *streamChunkEncoder) Encode() ([]byte, error) {
+func (t *plainEncoder) Encode() ([]byte, error) {
 	if t.tsBuff.Len() < 1 {
 		return nil, ErrEncodeEmpty
 	}
 	val := t.valBuff.Bytes()
 	t.len = uint32(len(val))
-	_, err := t.tsBuff.WriteTo(&t.valBuff)
-	if err != nil {
-		return nil, err
-	}
-	t.valBuff.Write(t.putUint32(t.num))
-	t.valBuff.Write(t.putUint32(t.len))
+	t.tsBuff.WriteTo(t.valBuff)
+	t.valBuff.PutUint32(t.num)
+	t.valBuff.PutUint32(t.len)
 	data := t.valBuff.Bytes()
 	l := len(data)
 	dst := make([]byte, 0, compressBound(l))
-	dst = encoder.EncodeAll(data, dst)
-	result := make([]byte, len(dst)+2)
-	copy(result, dst)
-	copy(result[len(dst):], t.putUint16(uint16(l)))
-	return result, nil
+	dst = zstdEncoder.EncodeAll(data, dst)
+	result := buffer.NewBufferWriter(bytes.NewBuffer(make([]byte, len(dst)+2)))
+	result.Write(dst)
+	result.PutUint16(uint16(l))
+	return result.Bytes(), nil
 }
 
 func compressBound(srcSize int) int {
 	return srcSize + (srcSize >> 8)
 }
 
-func (t *streamChunkEncoder) StartTime() uint64 {
+func (t *plainEncoder) StartTime() uint64 {
 	return t.startTime
 }
 
-func (t *streamChunkEncoder) putUint16(v uint16) []byte {
-	binary.LittleEndian.PutUint16(t.scratch[:], v)
-	return t.scratch[:2]
-}
-
-func (t *streamChunkEncoder) putUint32(v uint32) []byte {
-	binary.LittleEndian.PutUint32(t.scratch[:], v)
-	return t.scratch[:4]
-}
-
-func (t *streamChunkEncoder) putUint64(v uint64) []byte {
-	binary.LittleEndian.PutUint64(t.scratch[:], v)
-	return t.scratch[:8]
-}
-
 const (
 	// TsLen equals ts(uint64) + data_offset(uint32)
 	TsLen = 8 + 4
@@ -129,8 +170,8 @@ const (
 
 var ErrInvalidValue = errors.New("invalid encoded value")
 
-//StreamChunkDecoder decodes encoded time index
-type StreamChunkDecoder struct {
+//plainDecoder decodes encoded time index
+type plainDecoder struct {
 	ts        []byte
 	val       []byte
 	len       uint32
@@ -138,20 +179,14 @@ type StreamChunkDecoder struct {
 	valueSize int
 }
 
-func NewStreamChunkDecoder(size int) SeriesDecoder {
-	return &StreamChunkDecoder{
-		valueSize: size,
-	}
-}
-
-func (t *StreamChunkDecoder) Len() int {
+func (t *plainDecoder) Len() int {
 	return int(t.num)
 }
 
-func (t *StreamChunkDecoder) Decode(rawData []byte) (err error) {
+func (t *plainDecoder) Decode(_, rawData []byte) (err error) {
 	var data []byte
 	size := binary.LittleEndian.Uint16(rawData[len(rawData)-2:])
-	if data, err = decoder.DecodeAll(rawData[:len(rawData)-2], make([]byte, 0, size)); err != nil {
+	if data, err = zstdDecoder.DecodeAll(rawData[:len(rawData)-2], make([]byte, 0, size)); err != nil {
 		return err
 	}
 	l := uint32(len(data))
@@ -170,11 +205,11 @@ func (t *StreamChunkDecoder) Decode(rawData []byte) (err error) {
 	return nil
 }
 
-func (t *StreamChunkDecoder) IsFull() bool {
+func (t *plainDecoder) IsFull() bool {
 	return int(t.len) >= t.valueSize
 }
 
-func (t *StreamChunkDecoder) Get(ts uint64) ([]byte, error) {
+func (t *plainDecoder) Get(ts uint64) ([]byte, error) {
 	i := sort.Search(int(t.num), func(i int) bool {
 		slot := getTSSlot(t.ts, i)
 		return parseTS(slot) <= ts
@@ -189,7 +224,7 @@ func (t *StreamChunkDecoder) Get(ts uint64) ([]byte, error) {
 	return getVal(t.val, parseOffset(slot))
 }
 
-func (t *StreamChunkDecoder) Iterator() SeriesIterator {
+func (t *plainDecoder) Iterator() SeriesIterator {
 	return newBlockItemIterator(t)
 }
 
@@ -213,17 +248,17 @@ func parseOffset(tsSlot []byte) uint32 {
 	return binary.LittleEndian.Uint32(tsSlot[8:])
 }
 
-var _ SeriesIterator = (*chunkIterator)(nil)
+var _ SeriesIterator = (*plainIterator)(nil)
 
-type chunkIterator struct {
+type plainIterator struct {
 	index []byte
 	data  []byte
 	idx   int
 	num   int
 }
 
-func newBlockItemIterator(decoder *StreamChunkDecoder) SeriesIterator {
-	return &chunkIterator{
+func newBlockItemIterator(decoder *plainDecoder) SeriesIterator {
+	return &plainIterator{
 		idx:   -1,
 		index: decoder.ts,
 		data:  decoder.val,
@@ -231,20 +266,20 @@ func newBlockItemIterator(decoder *StreamChunkDecoder) SeriesIterator {
 	}
 }
 
-func (b *chunkIterator) Next() bool {
+func (b *plainIterator) Next() bool {
 	b.idx++
 	return b.idx >= 0 && b.idx < b.num
 }
 
-func (b *chunkIterator) Val() []byte {
+func (b *plainIterator) Val() []byte {
 	v, _ := getVal(b.data, parseOffset(getTSSlot(b.index, b.idx)))
 	return v
 }
 
-func (b *chunkIterator) Time() uint64 {
+func (b *plainIterator) Time() uint64 {
 	return parseTS(getTSSlot(b.index, b.idx))
 }
 
-func (b *chunkIterator) Error() error {
+func (b *plainIterator) Error() error {
 	return nil
 }
diff --git a/pkg/encoding/xor.go b/pkg/encoding/xor.go
new file mode 100644
index 0000000..43203ca
--- /dev/null
+++ b/pkg/encoding/xor.go
@@ -0,0 +1,177 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+	"math/bits"
+
+	"github.com/apache/skywalking-banyandb/pkg/bit"
+)
+
+const (
+	ctrlBitsNoContainMeaningful = 0x2
+	ctrlBitsContainMeaningful   = 0x3
+)
+
+// XOREncoder intends to compress uint64 data
+// https://www.vldb.org/pvldb/vol8/p1816-teller.pdf
+type XOREncoder struct {
+	bw       *bit.Writer
+	preVal   uint64
+	leading  int
+	trailing int
+
+	first bool
+}
+
+// NewXOREncoder creates xor zstdEncoder for compressing uint64 data
+func NewXOREncoder(bw *bit.Writer) *XOREncoder {
+	return &XOREncoder{
+		bw:    bw,
+		first: true,
+	}
+}
+
+func (e *XOREncoder) Write(val uint64) {
+	if e.first {
+		e.first = false
+		e.preVal = val
+		e.bw.WriteBits(val, 64)
+		return
+	}
+
+	delta := val ^ e.preVal
+	e.preVal = val
+	if delta == 0 {
+		e.bw.WriteBool(false)
+		return
+	}
+
+	leading := bits.LeadingZeros64(delta)
+	trailing := bits.TrailingZeros64(delta)
+	if leading >= e.leading && trailing >= e.trailing {
+		// write control '10' to reuse previous block meaningful bits
+		e.bw.WriteBits(ctrlBitsNoContainMeaningful, 2)
+		e.bw.WriteBits(delta>>uint(e.trailing), 64-e.leading-e.trailing)
+	} else {
+		// write control '11' to create a new block meaningful bits
+		e.bw.WriteBits(ctrlBitsContainMeaningful, 2)
+		meaningfulLen := 64 - leading - trailing
+		e.bw.WriteBits(uint64(leading), 6)
+		// meaningfulLen is at least 1, so we can subtract 1 from it and encode it in 6 bits
+		e.bw.WriteBits(uint64(meaningfulLen-1), 6)
+		e.bw.WriteBits(delta>>uint(trailing), meaningfulLen)
+
+		e.leading = leading
+		e.trailing = trailing
+	}
+}
+
+// XORDecoder decodes buffer to uint64 values using xor compress
+type XORDecoder struct {
+	val uint64
+
+	br *bit.Reader
+
+	leading  uint64
+	trailing uint64
+
+	first bool
+	err   error
+}
+
+// NewXORDecoder create zstdDecoder decompress buffer using xor
+func NewXORDecoder(br *bit.Reader) *XORDecoder {
+	s := &XORDecoder{
+		br:    br,
+		first: true,
+	}
+	return s
+}
+
+// Reset resets the underlying buffer to decode
+func (d *XORDecoder) Reset() {
+	d.first = true
+	d.leading = 0
+	d.trailing = 0
+	d.val = 0
+}
+
+// Next return if zstdDecoder has value in buffer using xor, do uncompress logic in next method,
+// data format reference zstdEncoder format
+func (d *XORDecoder) Next() bool {
+	if d.first {
+		// read first value
+		d.first = false
+		d.val, d.err = d.br.ReadBits(64)
+		return d.err == nil
+	}
+
+	var b bool
+	// read delta control bit
+	b, d.err = d.br.ReadBool()
+	if d.err != nil {
+		return false
+	}
+	if !b {
+		return true
+	}
+	ctrlBits := ctrlBitsNoContainMeaningful
+	// read control bit
+	b, d.err = d.br.ReadBool()
+	if d.err != nil {
+		return false
+	}
+	if b {
+		ctrlBits |= 1
+	}
+	var blockSize uint64
+	if ctrlBits == ctrlBitsNoContainMeaningful {
+		blockSize = 64 - d.leading - d.trailing
+	} else {
+		// read leading and trailing, because block is diff with previous
+		d.leading, d.err = d.br.ReadBits(6)
+		if d.err != nil {
+			return false
+		}
+		blockSize, d.err = d.br.ReadBits(6)
+		if d.err != nil {
+			return false
+		}
+		blockSize++
+		d.trailing = 64 - d.leading - blockSize
+	}
+	delta, err := d.br.ReadBits(int(blockSize))
+	if err != nil {
+		d.err = err
+		return false
+	}
+	val := delta << d.trailing
+	d.val ^= val
+	return true
+}
+
+// Value returns uint64 from buffer
+func (d *XORDecoder) Value() uint64 {
+	return d.val
+}
+
+// Err returns error raised in Next()
+func (d *XORDecoder) Err() error {
+	return d.err
+}
diff --git a/pkg/encoding/xor_test.go b/pkg/encoding/xor_test.go
new file mode 100644
index 0000000..282ab57
--- /dev/null
+++ b/pkg/encoding/xor_test.go
@@ -0,0 +1,60 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+	"bytes"
+	"io"
+	"testing"
+
+	"github.com/pkg/errors"
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/skywalking-banyandb/pkg/bit"
+)
+
+func TestXOR(t *testing.T) {
+	var buf bytes.Buffer
+	bitWriter := bit.NewWriter(&buf)
+	e := NewXOREncoder(bitWriter)
+	e.Write(uint64(76))
+	e.Write(uint64(50))
+	e.Write(uint64(50))
+	e.Write(uint64(999999999))
+	e.Write(uint64(100))
+
+	bitWriter.Flush()
+	data := buf.Bytes()
+
+	reader := bit.NewReader(bytes.NewReader(data))
+	d := NewXORDecoder(reader)
+	a := assert.New(t)
+	verify(d, a, uint64(76))
+	verify(d, a, uint64(50))
+	verify(d, a, uint64(50))
+	verify(d, a, uint64(999999999))
+	verify(d, a, uint64(100))
+}
+
+func verify(d *XORDecoder, a *assert.Assertions, except uint64) {
+	a.True(d.Next())
+	if d.Err() != nil && !errors.Is(d.Err(), io.EOF) {
+		a.Fail("error: %v", d.Err())
+	}
+	a.Equal(except, d.Value())
+}