You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/11/20 02:30:07 UTC

[skywalking-banyandb] branch encoding created (now a9e768b)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch encoding
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git.


      at a9e768b  Merge remote-tracking branch 'origin/main' into encoding

This branch includes the following new commits:

     new 08a9b4e  Introduce gorilla encoding for integer values
     new a9e768b  Merge remote-tracking branch 'origin/main' into encoding

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[skywalking-banyandb] 02/02: Merge remote-tracking branch 'origin/main' into encoding

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch encoding
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit a9e768bb2bdeab423f1df140922e965ae3505b79
Merge: 08a9b4e aa9fb38
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sat Nov 20 08:37:56 2021 +0800

    Merge remote-tracking branch 'origin/main' into encoding


[skywalking-banyandb] 01/02: Introduce gorilla encoding for integer values

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch encoding
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 08a9b4e815e655de3cef9172bc29f75d2b5ea6a0
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sat Nov 20 08:31:18 2021 +0800

    Introduce gorilla encoding for integer values
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/kv/badger.go                       |  31 +++++
 banyand/kv/kv.go                           |  16 ++-
 banyand/stream/stream.go                   |   8 +-
 banyand/tsdb/block.go                      |   2 +-
 banyand/tsdb/tsdb.go                       |   6 +-
 banyand/tsdb/tsdb_test.go                  |   8 +-
 go.mod                                     |   2 +-
 go.sum                                     |   4 +-
 pkg/bit/reader.go                          | 104 +++++++++++++++
 pkg/{bytes/bytes.go => bit/reader_test.go} |  41 ++++--
 pkg/bit/writer.go                          |  92 ++++++++++++++
 pkg/{bytes/bytes.go => bit/writer_test.go} |  50 ++++++--
 pkg/buffer/writer.go                       |  72 +++++++++++
 pkg/encoding/encoding.go                   |  14 ++-
 pkg/encoding/int.go                        | 195 +++++++++++++++++++++++++++++
 pkg/encoding/{stream_chunk.go => plain.go} | 175 +++++++++++++++-----------
 pkg/encoding/xor.go                        | 177 ++++++++++++++++++++++++++
 pkg/encoding/xor_test.go                   |  60 +++++++++
 18 files changed, 932 insertions(+), 125 deletions(-)

diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 24820f0..728a298 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -259,6 +259,37 @@ func (l *badgerLog) Debugf(f string, v ...interface{}) {
 	l.delegated.Debug().Msgf(f, v...)
 }
 
+var _ bydb.TSetEncoderPool = (*encoderPoolDelegate)(nil)
+
+type encoderPoolDelegate struct {
+	encoding.SeriesEncoderPool
+}
+
+func (e *encoderPoolDelegate) Get(metadata []byte) bydb.TSetEncoder {
+	return e.SeriesEncoderPool.Get(metadata)
+}
+
+func (e *encoderPoolDelegate) Put(encoder bydb.TSetEncoder) {
+	e.SeriesEncoderPool.Put(encoder)
+}
+
+var _ bydb.TSetDecoderPool = (*decoderPoolDelegate)(nil)
+
+type decoderPoolDelegate struct {
+	encoding.SeriesDecoderPool
+}
+
+func (e *decoderPoolDelegate) Get(metadata []byte) bydb.TSetDecoder {
+	return &decoderDelegate{
+		e.SeriesDecoderPool.Get(metadata),
+	}
+}
+
+func (e *decoderPoolDelegate) Put(decoder bydb.TSetDecoder) {
+	dd := decoder.(*decoderDelegate)
+	e.SeriesDecoderPool.Put(dd.SeriesDecoder)
+}
+
 var _ bydb.TSetDecoder = (*decoderDelegate)(nil)
 
 type decoderDelegate struct {
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index ea92d1c..2def4dc 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -23,7 +23,6 @@ import (
 	"math"
 
 	"github.com/dgraph-io/badger/v3"
-	"github.com/dgraph-io/badger/v3/bydb"
 	"github.com/pkg/errors"
 
 	"github.com/apache/skywalking-banyandb/pkg/encoding"
@@ -103,16 +102,15 @@ func TSSWithLogger(l *logger.Logger) TimeSeriesOptions {
 	}
 }
 
-func TSSWithEncoding(encoderFactory encoding.SeriesEncoderFactory, decoderFactory encoding.SeriesDecoderFactory) TimeSeriesOptions {
+func TSSWithEncoding(encoderPool encoding.SeriesEncoderPool, decoderPool encoding.SeriesDecoderPool) TimeSeriesOptions {
 	return func(store TimeSeriesStore) {
 		if btss, ok := store.(*badgerTSS); ok {
-			btss.dbOpts = btss.dbOpts.WithExternalCompactor(func() bydb.TSetEncoder {
-				return encoderFactory()
-			}, func() bydb.TSetDecoder {
-				return &decoderDelegate{
-					SeriesDecoder: decoderFactory(),
-				}
-			})
+			btss.dbOpts = btss.dbOpts.WithExternalCompactor(
+				&encoderPoolDelegate{
+					encoderPool,
+				}, &decoderPoolDelegate{
+					decoderPool,
+				})
 		}
 	}
 }
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 760687d..444a483 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -107,12 +107,8 @@ func openStream(root string, spec streamSpec, l *logger.Logger) (*stream, error)
 			ShardNum:   sm.schema.GetOpts().GetShardNum(),
 			IndexRules: spec.indexRules,
 			EncodingMethod: tsdb.EncodingMethod{
-				EncoderFactory: func() encoding.SeriesEncoder {
-					return encoding.NewStreamChunkEncoder(chunkSize)
-				},
-				DecoderFactory: func() encoding.SeriesDecoder {
-					return encoding.NewStreamChunkDecoder(chunkSize)
-				},
+				EncoderPool: encoding.NewPlainEncoderPool(chunkSize),
+				DecoderPool: encoding.NewPlainDecoderPool(chunkSize),
 			},
 		})
 	if err != nil {
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 2f38308..956bf13 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -78,7 +78,7 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 	if b.store, err = kv.OpenTimeSeriesStore(
 		0,
 		b.path+"/store",
-		kv.TSSWithEncoding(encodingMethod.EncoderFactory, encodingMethod.DecoderFactory),
+		kv.TSSWithEncoding(encodingMethod.EncoderPool, encodingMethod.DecoderPool),
 		kv.TSSWithLogger(b.l),
 	); err != nil {
 		return nil, err
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 8df233b..e9ed19a 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -82,8 +82,8 @@ type DatabaseOpts struct {
 }
 
 type EncodingMethod struct {
-	EncoderFactory encoding.SeriesEncoderFactory
-	DecoderFactory encoding.SeriesDecoderFactory
+	EncoderPool encoding.SeriesEncoderPool
+	DecoderPool encoding.SeriesDecoderPool
 }
 
 type database struct {
@@ -124,7 +124,7 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
 			db.logger = pl.Named("tsdb")
 		}
 	}
-	if opts.EncodingMethod.EncoderFactory == nil || opts.EncodingMethod.DecoderFactory == nil {
+	if opts.EncodingMethod.EncoderPool == nil || opts.EncodingMethod.DecoderPool == nil {
 		return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to open database")
 	}
 	if _, err := mkdir(opts.Location); err != nil {
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index 89c56d6..c570fdf 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -58,12 +58,8 @@ func setUp(t *require.Assertions) (tempDir string, deferFunc func(), db Database
 			Location: tempDir,
 			ShardNum: 1,
 			EncodingMethod: EncodingMethod{
-				EncoderFactory: func() encoding.SeriesEncoder {
-					return nil
-				},
-				DecoderFactory: func() encoding.SeriesDecoder {
-					return nil
-				},
+				EncoderPool: encoding.NewPlainEncoderPool(0),
+				DecoderPool: encoding.NewPlainDecoderPool(0),
 			},
 		})
 	t.NoError(err)
diff --git a/go.mod b/go.mod
index 24e4ce0..b2815cb 100644
--- a/go.mod
+++ b/go.mod
@@ -101,4 +101,4 @@ require (
 	sigs.k8s.io/yaml v1.2.0 // indirect
 )
 
-replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476
+replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a
diff --git a/go.sum b/go.sum
index 957109b..afb3bff 100644
--- a/go.sum
+++ b/go.sum
@@ -47,8 +47,8 @@ github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eW
 github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE=
 github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM=
 github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
-github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476 h1:MH/Jy2x3WF3RdD+WD25XepG4fzIz3qMOoIUM4Enn+GA=
-github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M=
+github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a h1:kcUQmdVI0E0J8bfwJpbQhWOOxijKNeoEfLsiIkayf1E=
+github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
diff --git a/pkg/bit/reader.go b/pkg/bit/reader.go
new file mode 100644
index 0000000..22cb929
--- /dev/null
+++ b/pkg/bit/reader.go
@@ -0,0 +1,104 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bit
+
+import (
+	"io"
+)
+
+// Reader reads bits from buffer
+type Reader struct {
+	in    io.ByteReader
+	cache byte
+	len   byte
+}
+
+// NewReader crate bit reader
+func NewReader(in io.ByteReader) *Reader {
+	return &Reader{
+		in: in,
+	}
+}
+
+// ReadBool reads a bit, 1 returns true, 0 returns false
+func (r *Reader) ReadBool() (bool, error) {
+	if r.len == 0 {
+		b, err := r.in.ReadByte()
+		if err != nil {
+			return false, err
+		}
+		r.cache = b
+		r.len = 8
+	}
+	r.len--
+	b := r.cache & 0x80
+	r.cache <<= 1
+	return b != 0, nil
+}
+
+// ReadBits read number of bits
+func (r *Reader) ReadBits(numBits int) (uint64, error) {
+	var result uint64
+
+	for ; numBits >= 8; numBits -= 8 {
+		b, err := r.ReadByte()
+		if err != nil {
+			return 0, err
+		}
+
+		result = (result << 8) | uint64(b)
+	}
+
+	for ; numBits > 0; numBits-- {
+		byt, err := r.ReadBool()
+		if err != nil {
+			return 0, err
+		}
+		result <<= 1
+		if byt {
+			result |= 1
+		}
+	}
+
+	return result, nil
+}
+
+// ReadByte reads a byte
+func (r *Reader) ReadByte() (byte, error) {
+	if r.len == 0 {
+		b, err := r.in.ReadByte()
+		if err != nil {
+			return b, err
+		}
+		r.cache = b
+		return b, err
+	}
+	b, err := r.in.ReadByte()
+	if err != nil {
+		return b, err
+	}
+	result := r.cache | b>>r.len
+	r.cache = b << (8 - r.len)
+	return result, nil
+}
+
+// Reset resets the reader to read from a new slice
+func (r *Reader) Reset() {
+	r.len = 0
+	r.cache = 0
+}
diff --git a/pkg/bytes/bytes.go b/pkg/bit/reader_test.go
similarity index 56%
copy from pkg/bytes/bytes.go
copy to pkg/bit/reader_test.go
index b4d051c..6f30bbe 100644
--- a/pkg/bytes/bytes.go
+++ b/pkg/bit/reader_test.go
@@ -15,17 +15,38 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package bytes
+package bit
 
-func Join(s ...[]byte) []byte {
-	n := 0
-	for _, v := range s {
-		n += len(v)
-	}
+import (
+	"bytes"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestReader(t *testing.T) {
+	data := []byte{3, 255, 0xcc, 0x1a, 0xbc, 0xde, 0x80}
+
+	r := NewReader(bytes.NewBuffer(data))
+	a := assert.New(t)
+
+	eq(a, byte(3))(r.ReadByte())
+	eq(a, uint64(255))(r.ReadBits(8))
+
+	eq(a, uint64(0xc))(r.ReadBits(4))
+
+	eq(a, uint64(0xc1))(r.ReadBits(8))
+
+	eq(a, uint64(0xabcde))(r.ReadBits(20))
+
+	eq(a, true)(r.ReadBool())
+	eq(a, false)(r.ReadBool())
+
+}
 
-	b, i := make([]byte, n), 0
-	for _, v := range s {
-		i += copy(b[i:], v)
+func eq(a *assert.Assertions, expected interface{}) func(interface{}, error) {
+	return func(actual interface{}, err error) {
+		a.NoError(err)
+		a.Equal(expected, actual)
 	}
-	return b
 }
diff --git a/pkg/bit/writer.go b/pkg/bit/writer.go
new file mode 100644
index 0000000..6302f9e
--- /dev/null
+++ b/pkg/bit/writer.go
@@ -0,0 +1,92 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bit
+
+import (
+	"bytes"
+)
+
+// Writer writes bits to an io.BufferWriter
+type Writer struct {
+	out       *bytes.Buffer
+	cache     byte
+	available byte
+}
+
+// NewWriter create bit writer
+func NewWriter(buffer *bytes.Buffer) *Writer {
+	var bw Writer
+	bw.Reset(buffer)
+	return &bw
+}
+
+// Reset writes to a new writer
+func (w *Writer) Reset(buffer *bytes.Buffer) {
+	w.out = buffer
+	w.cache = 0
+	w.available = 8
+}
+
+// WriteBool writes a boolean value
+// true: 1
+// false: 0
+func (w *Writer) WriteBool(b bool) {
+	if b {
+		w.cache |= 1 << (w.available - 1)
+	}
+
+	w.available--
+
+	if w.available == 0 {
+		// WriteByte never returns error
+		_ = w.out.WriteByte(w.cache)
+		w.cache = 0
+		w.available = 8
+	}
+}
+
+// WriteBits writes number of bits
+func (w *Writer) WriteBits(u uint64, numBits int) {
+	u <<= 64 - uint(numBits)
+
+	for ; numBits >= 8; numBits -= 8 {
+		byt := byte(u >> 56)
+		w.WriteByte(byt)
+		u <<= 8
+	}
+
+	remainder := byte(u >> 56)
+	for ; numBits > 0; numBits-- {
+		w.WriteBool((remainder & 0x80) != 0)
+		remainder <<= 1
+	}
+}
+
+// WriteByte write a byte
+func (w *Writer) WriteByte(b byte) {
+	_ = w.out.WriteByte(w.cache | (b >> (8 - w.available)))
+	w.cache = b << w.available
+}
+
+// Flush flushes the currently in-process byte
+func (w *Writer) Flush() {
+	if w.available != 8 {
+		_ = w.out.WriteByte(w.cache)
+	}
+	w.Reset(w.out)
+}
diff --git a/pkg/bytes/bytes.go b/pkg/bit/writer_test.go
similarity index 57%
rename from pkg/bytes/bytes.go
rename to pkg/bit/writer_test.go
index b4d051c..7624ff8 100644
--- a/pkg/bytes/bytes.go
+++ b/pkg/bit/writer_test.go
@@ -15,17 +15,41 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package bytes
-
-func Join(s ...[]byte) []byte {
-	n := 0
-	for _, v := range s {
-		n += len(v)
-	}
-
-	b, i := make([]byte, n), 0
-	for _, v := range s {
-		i += copy(b[i:], v)
-	}
-	return b
+package bit
+
+import (
+	"bytes"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestWriter(t *testing.T) {
+	out := &bytes.Buffer{}
+	w := NewWriter(out)
+
+	a := assert.New(t)
+
+	w.WriteByte(0xc1)
+	w.WriteBool(false)
+	w.WriteBits(0x3f, 6)
+	w.WriteBool(true)
+	w.WriteByte(0xac)
+	w.WriteBits(0x01, 1)
+	w.WriteBits(0x1248f, 20)
+	w.Flush()
+
+	w.WriteByte(0x01)
+	w.WriteByte(0x02)
+
+	w.WriteBits(0x0f, 4)
+
+	w.WriteByte(0x80)
+	w.WriteByte(0x8f)
+	w.Flush()
+
+	w.WriteBits(0x01, 1)
+	w.WriteByte(0xff)
+	w.Flush()
+	a.Equal([]byte{0xc1, 0x7f, 0xac, 0x89, 0x24, 0x78, 0x01, 0x02, 0xf8, 0x08, 0xf0, 0xff, 0x80}, out.Bytes())
 }
diff --git a/pkg/buffer/writer.go b/pkg/buffer/writer.go
new file mode 100644
index 0000000..e109d25
--- /dev/null
+++ b/pkg/buffer/writer.go
@@ -0,0 +1,72 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package buffer
+
+import (
+	"bytes"
+	"encoding/binary"
+)
+
+// Writer writes data into a buffer
+type Writer struct {
+	buf *bytes.Buffer
+
+	scratch [binary.MaxVarintLen64]byte
+}
+
+func NewBufferWriter(buf *bytes.Buffer) *Writer {
+	return &Writer{
+		buf: buf,
+	}
+}
+
+func (w *Writer) Write(p []byte) {
+	_, _ = w.buf.Write(p)
+}
+
+func (w *Writer) WriteTo(other *Writer) (n int64) {
+	n, _ = w.buf.WriteTo(other.buf)
+	return n
+}
+
+func (w *Writer) PutUint16(v uint16) {
+	binary.LittleEndian.PutUint16(w.scratch[:], v)
+	_, _ = w.buf.Write(w.scratch[:2])
+}
+
+func (w *Writer) PutUint32(v uint32) {
+	binary.LittleEndian.PutUint32(w.scratch[:], v)
+	_, _ = w.buf.Write(w.scratch[:4])
+}
+
+func (w *Writer) PutUint64(v uint64) {
+	binary.LittleEndian.PutUint64(w.scratch[:], v)
+	_, _ = w.buf.Write(w.scratch[:8])
+}
+
+func (w *Writer) Reset() {
+	w.buf.Reset()
+}
+
+func (w *Writer) Len() int {
+	return w.buf.Len()
+}
+
+func (w *Writer) Bytes() []byte {
+	return w.buf.Bytes()
+}
diff --git a/pkg/encoding/encoding.go b/pkg/encoding/encoding.go
index 5e04e72..05a1f14 100644
--- a/pkg/encoding/encoding.go
+++ b/pkg/encoding/encoding.go
@@ -21,7 +21,10 @@ import "github.com/pkg/errors"
 
 var ErrEncodeEmpty = errors.New("encode an empty value")
 
-type SeriesEncoderFactory func() SeriesEncoder
+type SeriesEncoderPool interface {
+	Get(metadata []byte) SeriesEncoder
+	Put(encoder SeriesEncoder)
+}
 
 // SeriesEncoder encodes time series data point
 type SeriesEncoder interface {
@@ -30,19 +33,22 @@ type SeriesEncoder interface {
 	// IsFull returns whether the encoded data reached its capacity
 	IsFull() bool
 	// Reset the underlying buffer
-	Reset()
+	Reset(key []byte)
 	// Encode the time series data point to a binary
 	Encode() ([]byte, error)
 	// StartTime indicates the first entry's time
 	StartTime() uint64
 }
 
-type SeriesDecoderFactory func() SeriesDecoder
+type SeriesDecoderPool interface {
+	Get(metadata []byte) SeriesDecoder
+	Put(encoder SeriesDecoder)
+}
 
 // SeriesDecoder decodes encoded time series data
 type SeriesDecoder interface {
 	// Decode the time series data
-	Decode(data []byte) error
+	Decode(key, data []byte) error
 	// Len denotes the size of iterator
 	Len() int
 	// IsFull returns whether the encoded data reached its capacity
diff --git a/pkg/encoding/int.go b/pkg/encoding/int.go
new file mode 100644
index 0000000..9571ec4
--- /dev/null
+++ b/pkg/encoding/int.go
@@ -0,0 +1,195 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+	"bytes"
+	"encoding/binary"
+	"time"
+
+	"github.com/apache/skywalking-banyandb/pkg/bit"
+	"github.com/apache/skywalking-banyandb/pkg/buffer"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+var (
+	_ SeriesEncoder = (*intEncoder)(nil)
+)
+
+type ParseInterval = func(key []byte) time.Duration
+
+type intEncoder struct {
+	buff      *bytes.Buffer
+	bw        *bit.Writer
+	values    *XOREncoder
+	fn        ParseInterval
+	interval  time.Duration
+	startTime uint64
+	num       int
+	size      int
+}
+
+func NewIntEncoder(size int, fn ParseInterval) SeriesEncoder {
+	buff := &bytes.Buffer{}
+	bw := bit.NewWriter(buff)
+	return &intEncoder{
+		buff:   buff,
+		bw:     bw,
+		values: NewXOREncoder(bw),
+		fn:     fn,
+		size:   size,
+	}
+}
+
+func (ie *intEncoder) Append(ts uint64, value []byte) {
+	if len(value) > 8 {
+		return
+	}
+	if ie.startTime == 0 {
+		ie.startTime = ts
+	}
+	gap := int(ts) - int(ie.startTime)
+	if gap < 0 {
+		return
+	}
+	zeroNum := gap/int(ie.interval) - 1
+	for i := 0; i < zeroNum; i++ {
+		ie.bw.WriteBool(false)
+		ie.num++
+	}
+	ie.bw.WriteBool(true)
+	ie.values.Write(binary.LittleEndian.Uint64(value))
+	ie.num++
+}
+
+func (ie *intEncoder) IsFull() bool {
+	return ie.num >= ie.size
+}
+
+func (ie *intEncoder) Reset(key []byte) {
+	ie.bw.Reset(nil)
+	ie.interval = ie.fn(key)
+}
+
+func (ie *intEncoder) Encode() ([]byte, error) {
+	ie.bw.Flush()
+	buffWriter := buffer.NewBufferWriter(ie.buff)
+	buffWriter.PutUint64(ie.startTime)
+	buffWriter.PutUint16(uint16(ie.size))
+	return ie.buff.Bytes(), nil
+}
+
+func (ie *intEncoder) StartTime() uint64 {
+	return ie.startTime
+}
+
+var _ SeriesDecoder = (*intDecoder)(nil)
+
+type intDecoder struct {
+	fn        ParseInterval
+	size      int
+	interval  time.Duration
+	startTime uint64
+	num       int
+	area      []byte
+}
+
+func NewIntDecoder(size int, fn ParseInterval) SeriesDecoder {
+	return &intDecoder{
+		fn:   fn,
+		size: size,
+	}
+}
+
+func (i intDecoder) Decode(key, data []byte) error {
+	i.interval = i.fn(key)
+	i.startTime = binary.LittleEndian.Uint64(data[len(data)-10 : len(data)-2])
+	i.num = int(binary.LittleEndian.Uint16(data[len(data)-2:]))
+	i.area = data[:len(data)-10]
+	return nil
+}
+
+func (i intDecoder) Len() int {
+	return i.num
+}
+
+func (i intDecoder) IsFull() bool {
+	return i.num >= i.size
+}
+
+func (i intDecoder) Get(ts uint64) ([]byte, error) {
+	for iter := i.Iterator(); iter.Next(); {
+		if iter.Time() == ts {
+			return iter.Val(), nil
+		}
+	}
+	return nil, nil
+}
+
+func (i intDecoder) Iterator() SeriesIterator {
+	br := bit.NewReader(bytes.NewReader(i.area))
+	return &intIterator{
+		startTime: i.startTime,
+		interval:  int(i.interval),
+		br:        br,
+		values:    NewXORDecoder(br),
+	}
+}
+
+var _ SeriesIterator = (*intIterator)(nil)
+
+type intIterator struct {
+	startTime uint64
+	interval  int
+	br        *bit.Reader
+	values    *XORDecoder
+
+	currVal  uint64
+	currTime uint64
+	index    int
+	err      error
+}
+
+func (i *intIterator) Next() bool {
+	var b bool
+	b, i.err = i.br.ReadBool()
+	if i.err != nil {
+		return false
+	}
+	if b {
+		if i.values.Next() {
+			i.currVal = i.values.Value()
+		}
+	}
+	i.currVal = 0
+	i.currTime = i.startTime + uint64(i.interval*i.index)
+	i.index++
+	return true
+}
+
+func (i *intIterator) Val() []byte {
+	return convert.Uint64ToBytes(i.currVal)
+}
+
+func (i *intIterator) Time() uint64 {
+	return i.currTime
+}
+
+func (i *intIterator) Error() error {
+	return i.err
+}
diff --git a/pkg/encoding/stream_chunk.go b/pkg/encoding/plain.go
similarity index 54%
rename from pkg/encoding/stream_chunk.go
rename to pkg/encoding/plain.go
index 7ff5094..0b3fb8c 100644
--- a/pkg/encoding/stream_chunk.go
+++ b/pkg/encoding/plain.go
@@ -22,106 +22,147 @@ import (
 	"encoding/binary"
 	"fmt"
 	"sort"
+	"sync"
 
 	"github.com/klauspost/compress/zstd"
 	"github.com/pkg/errors"
+
+	"github.com/apache/skywalking-banyandb/pkg/buffer"
 )
 
 var (
-	decoder, _               = zstd.NewReader(nil)
-	encoder, _               = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
-	_          SeriesEncoder = (*streamChunkEncoder)(nil)
-	_          SeriesDecoder = (*StreamChunkDecoder)(nil)
+	encoderPool = sync.Pool{
+		New: newPlainEncoder,
+	}
+	decoderPool = sync.Pool{
+		New: func() interface{} {
+			return &plainDecoder{}
+		},
+	}
+)
+
+type plainEncoderPool struct {
+	pool *sync.Pool
+	size int
+}
+
+func NewPlainEncoderPool(size int) SeriesEncoderPool {
+	return &plainEncoderPool{
+		pool: &encoderPool,
+		size: size,
+	}
+}
+
+func (b *plainEncoderPool) Get(metadata []byte) SeriesEncoder {
+	encoder := b.pool.Get().(*plainEncoder)
+	encoder.Reset(metadata)
+	encoder.valueSize = b.size
+	return encoder
+}
+
+func (b *plainEncoderPool) Put(encoder SeriesEncoder) {
+	b.pool.Put(encoder)
+}
+
+type plainDecoderPool struct {
+	pool *sync.Pool
+	size int
+}
+
+func NewPlainDecoderPool(size int) SeriesDecoderPool {
+	return &plainDecoderPool{
+		pool: &decoderPool,
+		size: size,
+	}
+}
+
+func (b *plainDecoderPool) Get(_ []byte) SeriesDecoder {
+	decoder := b.pool.Get().(*plainDecoder)
+	decoder.valueSize = b.size
+	return decoder
+}
+
+func (b *plainDecoderPool) Put(decoder SeriesDecoder) {
+	b.pool.Put(decoder)
+}
+
+var (
+	zstdDecoder, _               = zstd.NewReader(nil)
+	zstdEncoder, _               = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
+	_              SeriesEncoder = (*plainEncoder)(nil)
+	_              SeriesDecoder = (*plainDecoder)(nil)
 )
 
-//streamChunkEncoder backport to reduced value
-type streamChunkEncoder struct {
-	tsBuff    bytes.Buffer
-	valBuff   bytes.Buffer
-	scratch   [binary.MaxVarintLen64]byte
+//plainEncoder backport to reduced value
+type plainEncoder struct {
+	tsBuff    *buffer.Writer
+	valBuff   *buffer.Writer
 	len       uint32
 	num       uint32
 	startTime uint64
 	valueSize int
 }
 
-func NewStreamChunkEncoder(size int) SeriesEncoder {
-	return &streamChunkEncoder{
-		valueSize: size,
+func newPlainEncoder() interface{} {
+	return &plainEncoder{
+		tsBuff:  buffer.NewBufferWriter(&bytes.Buffer{}),
+		valBuff: buffer.NewBufferWriter(&bytes.Buffer{}),
 	}
 }
 
-func (t *streamChunkEncoder) Append(ts uint64, value []byte) {
+func (t *plainEncoder) Append(ts uint64, value []byte) {
 	if t.startTime == 0 {
 		t.startTime = ts
 	} else if t.startTime > ts {
 		t.startTime = ts
 	}
 	vLen := len(value)
-	offset := uint32(len(t.valBuff.Bytes()))
-	t.valBuff.Write(t.putUint32(uint32(vLen)))
+	offset := uint32(t.valBuff.Len())
+	t.valBuff.PutUint32(uint32(vLen))
 	t.valBuff.Write(value)
-	t.tsBuff.Write(t.putUint64(ts))
-	t.tsBuff.Write(t.putUint32(offset))
-	t.num = t.num + 1
+	t.tsBuff.PutUint64(ts)
+	t.tsBuff.PutUint32(offset)
+	t.num++
 }
 
-func (t *streamChunkEncoder) IsFull() bool {
+func (t *plainEncoder) IsFull() bool {
 	return t.valBuff.Len() >= t.valueSize
 }
 
-func (t *streamChunkEncoder) Reset() {
+func (t *plainEncoder) Reset(_ []byte) {
 	t.tsBuff.Reset()
 	t.valBuff.Reset()
 	t.num = 0
 	t.startTime = 0
 }
 
-func (t *streamChunkEncoder) Encode() ([]byte, error) {
+func (t *plainEncoder) Encode() ([]byte, error) {
 	if t.tsBuff.Len() < 1 {
 		return nil, ErrEncodeEmpty
 	}
 	val := t.valBuff.Bytes()
 	t.len = uint32(len(val))
-	_, err := t.tsBuff.WriteTo(&t.valBuff)
-	if err != nil {
-		return nil, err
-	}
-	t.valBuff.Write(t.putUint32(t.num))
-	t.valBuff.Write(t.putUint32(t.len))
+	t.tsBuff.WriteTo(t.valBuff)
+	t.valBuff.PutUint32(t.num)
+	t.valBuff.PutUint32(t.len)
 	data := t.valBuff.Bytes()
 	l := len(data)
 	dst := make([]byte, 0, compressBound(l))
-	dst = encoder.EncodeAll(data, dst)
-	result := make([]byte, len(dst)+2)
-	copy(result, dst)
-	copy(result[len(dst):], t.putUint16(uint16(l)))
-	return result, nil
+	dst = zstdEncoder.EncodeAll(data, dst)
+	result := buffer.NewBufferWriter(bytes.NewBuffer(make([]byte, len(dst)+2)))
+	result.Write(dst)
+	result.PutUint16(uint16(l))
+	return result.Bytes(), nil
 }
 
 func compressBound(srcSize int) int {
 	return srcSize + (srcSize >> 8)
 }
 
-func (t *streamChunkEncoder) StartTime() uint64 {
+func (t *plainEncoder) StartTime() uint64 {
 	return t.startTime
 }
 
-func (t *streamChunkEncoder) putUint16(v uint16) []byte {
-	binary.LittleEndian.PutUint16(t.scratch[:], v)
-	return t.scratch[:2]
-}
-
-func (t *streamChunkEncoder) putUint32(v uint32) []byte {
-	binary.LittleEndian.PutUint32(t.scratch[:], v)
-	return t.scratch[:4]
-}
-
-func (t *streamChunkEncoder) putUint64(v uint64) []byte {
-	binary.LittleEndian.PutUint64(t.scratch[:], v)
-	return t.scratch[:8]
-}
-
 const (
 	// TsLen equals ts(uint64) + data_offset(uint32)
 	TsLen = 8 + 4
@@ -129,8 +170,8 @@ const (
 
 var ErrInvalidValue = errors.New("invalid encoded value")
 
-//StreamChunkDecoder decodes encoded time index
-type StreamChunkDecoder struct {
+//plainDecoder decodes encoded time index
+type plainDecoder struct {
 	ts        []byte
 	val       []byte
 	len       uint32
@@ -138,20 +179,14 @@ type StreamChunkDecoder struct {
 	valueSize int
 }
 
-func NewStreamChunkDecoder(size int) SeriesDecoder {
-	return &StreamChunkDecoder{
-		valueSize: size,
-	}
-}
-
-func (t *StreamChunkDecoder) Len() int {
+func (t *plainDecoder) Len() int {
 	return int(t.num)
 }
 
-func (t *StreamChunkDecoder) Decode(rawData []byte) (err error) {
+func (t *plainDecoder) Decode(_, rawData []byte) (err error) {
 	var data []byte
 	size := binary.LittleEndian.Uint16(rawData[len(rawData)-2:])
-	if data, err = decoder.DecodeAll(rawData[:len(rawData)-2], make([]byte, 0, size)); err != nil {
+	if data, err = zstdDecoder.DecodeAll(rawData[:len(rawData)-2], make([]byte, 0, size)); err != nil {
 		return err
 	}
 	l := uint32(len(data))
@@ -170,11 +205,11 @@ func (t *StreamChunkDecoder) Decode(rawData []byte) (err error) {
 	return nil
 }
 
-func (t *StreamChunkDecoder) IsFull() bool {
+func (t *plainDecoder) IsFull() bool {
 	return int(t.len) >= t.valueSize
 }
 
-func (t *StreamChunkDecoder) Get(ts uint64) ([]byte, error) {
+func (t *plainDecoder) Get(ts uint64) ([]byte, error) {
 	i := sort.Search(int(t.num), func(i int) bool {
 		slot := getTSSlot(t.ts, i)
 		return parseTS(slot) <= ts
@@ -189,7 +224,7 @@ func (t *StreamChunkDecoder) Get(ts uint64) ([]byte, error) {
 	return getVal(t.val, parseOffset(slot))
 }
 
-func (t *StreamChunkDecoder) Iterator() SeriesIterator {
+func (t *plainDecoder) Iterator() SeriesIterator {
 	return newBlockItemIterator(t)
 }
 
@@ -213,17 +248,17 @@ func parseOffset(tsSlot []byte) uint32 {
 	return binary.LittleEndian.Uint32(tsSlot[8:])
 }
 
-var _ SeriesIterator = (*chunkIterator)(nil)
+var _ SeriesIterator = (*plainIterator)(nil)
 
-type chunkIterator struct {
+type plainIterator struct {
 	index []byte
 	data  []byte
 	idx   int
 	num   int
 }
 
-func newBlockItemIterator(decoder *StreamChunkDecoder) SeriesIterator {
-	return &chunkIterator{
+func newBlockItemIterator(decoder *plainDecoder) SeriesIterator {
+	return &plainIterator{
 		idx:   -1,
 		index: decoder.ts,
 		data:  decoder.val,
@@ -231,20 +266,20 @@ func newBlockItemIterator(decoder *StreamChunkDecoder) SeriesIterator {
 	}
 }
 
-func (b *chunkIterator) Next() bool {
+func (b *plainIterator) Next() bool {
 	b.idx++
 	return b.idx >= 0 && b.idx < b.num
 }
 
-func (b *chunkIterator) Val() []byte {
+func (b *plainIterator) Val() []byte {
 	v, _ := getVal(b.data, parseOffset(getTSSlot(b.index, b.idx)))
 	return v
 }
 
-func (b *chunkIterator) Time() uint64 {
+func (b *plainIterator) Time() uint64 {
 	return parseTS(getTSSlot(b.index, b.idx))
 }
 
-func (b *chunkIterator) Error() error {
+func (b *plainIterator) Error() error {
 	return nil
 }
diff --git a/pkg/encoding/xor.go b/pkg/encoding/xor.go
new file mode 100644
index 0000000..43203ca
--- /dev/null
+++ b/pkg/encoding/xor.go
@@ -0,0 +1,177 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+	"math/bits"
+
+	"github.com/apache/skywalking-banyandb/pkg/bit"
+)
+
+const (
+	ctrlBitsNoContainMeaningful = 0x2
+	ctrlBitsContainMeaningful   = 0x3
+)
+
+// XOREncoder intends to compress uint64 data
+// https://www.vldb.org/pvldb/vol8/p1816-teller.pdf
+type XOREncoder struct {
+	bw       *bit.Writer
+	preVal   uint64
+	leading  int
+	trailing int
+
+	first bool
+}
+
+// NewXOREncoder creates xor zstdEncoder for compressing uint64 data
+func NewXOREncoder(bw *bit.Writer) *XOREncoder {
+	return &XOREncoder{
+		bw:    bw,
+		first: true,
+	}
+}
+
+func (e *XOREncoder) Write(val uint64) {
+	if e.first {
+		e.first = false
+		e.preVal = val
+		e.bw.WriteBits(val, 64)
+		return
+	}
+
+	delta := val ^ e.preVal
+	e.preVal = val
+	if delta == 0 {
+		e.bw.WriteBool(false)
+		return
+	}
+
+	leading := bits.LeadingZeros64(delta)
+	trailing := bits.TrailingZeros64(delta)
+	if leading >= e.leading && trailing >= e.trailing {
+		// write control '10' to reuse previous block meaningful bits
+		e.bw.WriteBits(ctrlBitsNoContainMeaningful, 2)
+		e.bw.WriteBits(delta>>uint(e.trailing), 64-e.leading-e.trailing)
+	} else {
+		// write control '11' to create a new block meaningful bits
+		e.bw.WriteBits(ctrlBitsContainMeaningful, 2)
+		meaningfulLen := 64 - leading - trailing
+		e.bw.WriteBits(uint64(leading), 6)
+		// meaningfulLen is at least 1, so we can subtract 1 from it and encode it in 6 bits
+		e.bw.WriteBits(uint64(meaningfulLen-1), 6)
+		e.bw.WriteBits(delta>>uint(trailing), meaningfulLen)
+
+		e.leading = leading
+		e.trailing = trailing
+	}
+}
+
+// XORDecoder decodes buffer to uint64 values using xor compress
+type XORDecoder struct {
+	val uint64
+
+	br *bit.Reader
+
+	leading  uint64
+	trailing uint64
+
+	first bool
+	err   error
+}
+
+// NewXORDecoder create zstdDecoder decompress buffer using xor
+func NewXORDecoder(br *bit.Reader) *XORDecoder {
+	s := &XORDecoder{
+		br:    br,
+		first: true,
+	}
+	return s
+}
+
+// Reset resets the underlying buffer to decode
+func (d *XORDecoder) Reset() {
+	d.first = true
+	d.leading = 0
+	d.trailing = 0
+	d.val = 0
+}
+
+// Next return if zstdDecoder has value in buffer using xor, do uncompress logic in next method,
+// data format reference zstdEncoder format
+func (d *XORDecoder) Next() bool {
+	if d.first {
+		// read first value
+		d.first = false
+		d.val, d.err = d.br.ReadBits(64)
+		return d.err == nil
+	}
+
+	var b bool
+	// read delta control bit
+	b, d.err = d.br.ReadBool()
+	if d.err != nil {
+		return false
+	}
+	if !b {
+		return true
+	}
+	ctrlBits := ctrlBitsNoContainMeaningful
+	// read control bit
+	b, d.err = d.br.ReadBool()
+	if d.err != nil {
+		return false
+	}
+	if b {
+		ctrlBits |= 1
+	}
+	var blockSize uint64
+	if ctrlBits == ctrlBitsNoContainMeaningful {
+		blockSize = 64 - d.leading - d.trailing
+	} else {
+		// read leading and trailing, because block is diff with previous
+		d.leading, d.err = d.br.ReadBits(6)
+		if d.err != nil {
+			return false
+		}
+		blockSize, d.err = d.br.ReadBits(6)
+		if d.err != nil {
+			return false
+		}
+		blockSize++
+		d.trailing = 64 - d.leading - blockSize
+	}
+	delta, err := d.br.ReadBits(int(blockSize))
+	if err != nil {
+		d.err = err
+		return false
+	}
+	val := delta << d.trailing
+	d.val ^= val
+	return true
+}
+
+// Value returns uint64 from buffer
+func (d *XORDecoder) Value() uint64 {
+	return d.val
+}
+
+// Err returns error raised in Next()
+func (d *XORDecoder) Err() error {
+	return d.err
+}
diff --git a/pkg/encoding/xor_test.go b/pkg/encoding/xor_test.go
new file mode 100644
index 0000000..282ab57
--- /dev/null
+++ b/pkg/encoding/xor_test.go
@@ -0,0 +1,60 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+	"bytes"
+	"io"
+	"testing"
+
+	"github.com/pkg/errors"
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/skywalking-banyandb/pkg/bit"
+)
+
+func TestXOR(t *testing.T) {
+	var buf bytes.Buffer
+	bitWriter := bit.NewWriter(&buf)
+	e := NewXOREncoder(bitWriter)
+	e.Write(uint64(76))
+	e.Write(uint64(50))
+	e.Write(uint64(50))
+	e.Write(uint64(999999999))
+	e.Write(uint64(100))
+
+	bitWriter.Flush()
+	data := buf.Bytes()
+
+	reader := bit.NewReader(bytes.NewReader(data))
+	d := NewXORDecoder(reader)
+	a := assert.New(t)
+	verify(d, a, uint64(76))
+	verify(d, a, uint64(50))
+	verify(d, a, uint64(50))
+	verify(d, a, uint64(999999999))
+	verify(d, a, uint64(100))
+}
+
+func verify(d *XORDecoder, a *assert.Assertions, except uint64) {
+	a.True(d.Next())
+	if d.Err() != nil && !errors.Is(d.Err(), io.EOF) {
+		a.Fail("error: %v", d.Err())
+	}
+	a.Equal(except, d.Value())
+}