You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/11/20 02:30:08 UTC
[skywalking-banyandb] 01/02: Introduce gorilla encoding for integer values
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch encoding
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 08a9b4e815e655de3cef9172bc29f75d2b5ea6a0
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sat Nov 20 08:31:18 2021 +0800
Introduce gorilla encoding for integer values
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/kv/badger.go | 31 +++++
banyand/kv/kv.go | 16 ++-
banyand/stream/stream.go | 8 +-
banyand/tsdb/block.go | 2 +-
banyand/tsdb/tsdb.go | 6 +-
banyand/tsdb/tsdb_test.go | 8 +-
go.mod | 2 +-
go.sum | 4 +-
pkg/bit/reader.go | 104 +++++++++++++++
pkg/{bytes/bytes.go => bit/reader_test.go} | 41 ++++--
pkg/bit/writer.go | 92 ++++++++++++++
pkg/{bytes/bytes.go => bit/writer_test.go} | 50 ++++++--
pkg/buffer/writer.go | 72 +++++++++++
pkg/encoding/encoding.go | 14 ++-
pkg/encoding/int.go | 195 +++++++++++++++++++++++++++++
pkg/encoding/{stream_chunk.go => plain.go} | 175 +++++++++++++++-----------
pkg/encoding/xor.go | 177 ++++++++++++++++++++++++++
pkg/encoding/xor_test.go | 60 +++++++++
18 files changed, 932 insertions(+), 125 deletions(-)
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 24820f0..728a298 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -259,6 +259,37 @@ func (l *badgerLog) Debugf(f string, v ...interface{}) {
l.delegated.Debug().Msgf(f, v...)
}
+var _ bydb.TSetEncoderPool = (*encoderPoolDelegate)(nil)
+
+type encoderPoolDelegate struct {
+ encoding.SeriesEncoderPool
+}
+
+func (e *encoderPoolDelegate) Get(metadata []byte) bydb.TSetEncoder {
+ return e.SeriesEncoderPool.Get(metadata)
+}
+
+func (e *encoderPoolDelegate) Put(encoder bydb.TSetEncoder) {
+ e.SeriesEncoderPool.Put(encoder)
+}
+
+var _ bydb.TSetDecoderPool = (*decoderPoolDelegate)(nil)
+
+type decoderPoolDelegate struct {
+ encoding.SeriesDecoderPool
+}
+
+func (e *decoderPoolDelegate) Get(metadata []byte) bydb.TSetDecoder {
+ return &decoderDelegate{
+ e.SeriesDecoderPool.Get(metadata),
+ }
+}
+
+func (e *decoderPoolDelegate) Put(decoder bydb.TSetDecoder) {
+ dd := decoder.(*decoderDelegate)
+ e.SeriesDecoderPool.Put(dd.SeriesDecoder)
+}
+
var _ bydb.TSetDecoder = (*decoderDelegate)(nil)
type decoderDelegate struct {
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index ea92d1c..2def4dc 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -23,7 +23,6 @@ import (
"math"
"github.com/dgraph-io/badger/v3"
- "github.com/dgraph-io/badger/v3/bydb"
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/pkg/encoding"
@@ -103,16 +102,15 @@ func TSSWithLogger(l *logger.Logger) TimeSeriesOptions {
}
}
-func TSSWithEncoding(encoderFactory encoding.SeriesEncoderFactory, decoderFactory encoding.SeriesDecoderFactory) TimeSeriesOptions {
+func TSSWithEncoding(encoderPool encoding.SeriesEncoderPool, decoderPool encoding.SeriesDecoderPool) TimeSeriesOptions {
return func(store TimeSeriesStore) {
if btss, ok := store.(*badgerTSS); ok {
- btss.dbOpts = btss.dbOpts.WithExternalCompactor(func() bydb.TSetEncoder {
- return encoderFactory()
- }, func() bydb.TSetDecoder {
- return &decoderDelegate{
- SeriesDecoder: decoderFactory(),
- }
- })
+ btss.dbOpts = btss.dbOpts.WithExternalCompactor(
+ &encoderPoolDelegate{
+ encoderPool,
+ }, &decoderPoolDelegate{
+ decoderPool,
+ })
}
}
}
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 760687d..444a483 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -107,12 +107,8 @@ func openStream(root string, spec streamSpec, l *logger.Logger) (*stream, error)
ShardNum: sm.schema.GetOpts().GetShardNum(),
IndexRules: spec.indexRules,
EncodingMethod: tsdb.EncodingMethod{
- EncoderFactory: func() encoding.SeriesEncoder {
- return encoding.NewStreamChunkEncoder(chunkSize)
- },
- DecoderFactory: func() encoding.SeriesDecoder {
- return encoding.NewStreamChunkDecoder(chunkSize)
- },
+ EncoderPool: encoding.NewPlainEncoderPool(chunkSize),
+ DecoderPool: encoding.NewPlainDecoderPool(chunkSize),
},
})
if err != nil {
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 2f38308..956bf13 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -78,7 +78,7 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
if b.store, err = kv.OpenTimeSeriesStore(
0,
b.path+"/store",
- kv.TSSWithEncoding(encodingMethod.EncoderFactory, encodingMethod.DecoderFactory),
+ kv.TSSWithEncoding(encodingMethod.EncoderPool, encodingMethod.DecoderPool),
kv.TSSWithLogger(b.l),
); err != nil {
return nil, err
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 8df233b..e9ed19a 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -82,8 +82,8 @@ type DatabaseOpts struct {
}
type EncodingMethod struct {
- EncoderFactory encoding.SeriesEncoderFactory
- DecoderFactory encoding.SeriesDecoderFactory
+ EncoderPool encoding.SeriesEncoderPool
+ DecoderPool encoding.SeriesDecoderPool
}
type database struct {
@@ -124,7 +124,7 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
db.logger = pl.Named("tsdb")
}
}
- if opts.EncodingMethod.EncoderFactory == nil || opts.EncodingMethod.DecoderFactory == nil {
+ if opts.EncodingMethod.EncoderPool == nil || opts.EncodingMethod.DecoderPool == nil {
return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to open database")
}
if _, err := mkdir(opts.Location); err != nil {
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index 89c56d6..c570fdf 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -58,12 +58,8 @@ func setUp(t *require.Assertions) (tempDir string, deferFunc func(), db Database
Location: tempDir,
ShardNum: 1,
EncodingMethod: EncodingMethod{
- EncoderFactory: func() encoding.SeriesEncoder {
- return nil
- },
- DecoderFactory: func() encoding.SeriesDecoder {
- return nil
- },
+ EncoderPool: encoding.NewPlainEncoderPool(0),
+ DecoderPool: encoding.NewPlainDecoderPool(0),
},
})
t.NoError(err)
diff --git a/go.mod b/go.mod
index 24e4ce0..b2815cb 100644
--- a/go.mod
+++ b/go.mod
@@ -101,4 +101,4 @@ require (
sigs.k8s.io/yaml v1.2.0 // indirect
)
-replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476
+replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a
diff --git a/go.sum b/go.sum
index 957109b..afb3bff 100644
--- a/go.sum
+++ b/go.sum
@@ -47,8 +47,8 @@ github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eW
github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE=
github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM=
github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
-github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476 h1:MH/Jy2x3WF3RdD+WD25XepG4fzIz3qMOoIUM4Enn+GA=
-github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M=
+github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a h1:kcUQmdVI0E0J8bfwJpbQhWOOxijKNeoEfLsiIkayf1E=
+github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
diff --git a/pkg/bit/reader.go b/pkg/bit/reader.go
new file mode 100644
index 0000000..22cb929
--- /dev/null
+++ b/pkg/bit/reader.go
@@ -0,0 +1,104 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bit
+
+import (
+ "io"
+)
+
+// Reader reads bits from buffer
+type Reader struct {
+ in io.ByteReader
+ cache byte
+ len byte
+}
+
+// NewReader crate bit reader
+func NewReader(in io.ByteReader) *Reader {
+ return &Reader{
+ in: in,
+ }
+}
+
+// ReadBool reads a bit, 1 returns true, 0 returns false
+func (r *Reader) ReadBool() (bool, error) {
+ if r.len == 0 {
+ b, err := r.in.ReadByte()
+ if err != nil {
+ return false, err
+ }
+ r.cache = b
+ r.len = 8
+ }
+ r.len--
+ b := r.cache & 0x80
+ r.cache <<= 1
+ return b != 0, nil
+}
+
+// ReadBits read number of bits
+func (r *Reader) ReadBits(numBits int) (uint64, error) {
+ var result uint64
+
+ for ; numBits >= 8; numBits -= 8 {
+ b, err := r.ReadByte()
+ if err != nil {
+ return 0, err
+ }
+
+ result = (result << 8) | uint64(b)
+ }
+
+ for ; numBits > 0; numBits-- {
+ byt, err := r.ReadBool()
+ if err != nil {
+ return 0, err
+ }
+ result <<= 1
+ if byt {
+ result |= 1
+ }
+ }
+
+ return result, nil
+}
+
+// ReadByte reads a byte
+func (r *Reader) ReadByte() (byte, error) {
+ if r.len == 0 {
+ b, err := r.in.ReadByte()
+ if err != nil {
+ return b, err
+ }
+ r.cache = b
+ return b, err
+ }
+ b, err := r.in.ReadByte()
+ if err != nil {
+ return b, err
+ }
+ result := r.cache | b>>r.len
+ r.cache = b << (8 - r.len)
+ return result, nil
+}
+
+// Reset resets the reader to read from a new slice
+func (r *Reader) Reset() {
+ r.len = 0
+ r.cache = 0
+}
diff --git a/pkg/bytes/bytes.go b/pkg/bit/reader_test.go
similarity index 56%
copy from pkg/bytes/bytes.go
copy to pkg/bit/reader_test.go
index b4d051c..6f30bbe 100644
--- a/pkg/bytes/bytes.go
+++ b/pkg/bit/reader_test.go
@@ -15,17 +15,38 @@
// specific language governing permissions and limitations
// under the License.
-package bytes
+package bit
-func Join(s ...[]byte) []byte {
- n := 0
- for _, v := range s {
- n += len(v)
- }
+import (
+ "bytes"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestReader(t *testing.T) {
+ data := []byte{3, 255, 0xcc, 0x1a, 0xbc, 0xde, 0x80}
+
+ r := NewReader(bytes.NewBuffer(data))
+ a := assert.New(t)
+
+ eq(a, byte(3))(r.ReadByte())
+ eq(a, uint64(255))(r.ReadBits(8))
+
+ eq(a, uint64(0xc))(r.ReadBits(4))
+
+ eq(a, uint64(0xc1))(r.ReadBits(8))
+
+ eq(a, uint64(0xabcde))(r.ReadBits(20))
+
+ eq(a, true)(r.ReadBool())
+ eq(a, false)(r.ReadBool())
+
+}
- b, i := make([]byte, n), 0
- for _, v := range s {
- i += copy(b[i:], v)
+func eq(a *assert.Assertions, expected interface{}) func(interface{}, error) {
+ return func(actual interface{}, err error) {
+ a.NoError(err)
+ a.Equal(expected, actual)
}
- return b
}
diff --git a/pkg/bit/writer.go b/pkg/bit/writer.go
new file mode 100644
index 0000000..6302f9e
--- /dev/null
+++ b/pkg/bit/writer.go
@@ -0,0 +1,92 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bit
+
+import (
+ "bytes"
+)
+
+// Writer writes bits to an io.BufferWriter
+type Writer struct {
+ out *bytes.Buffer
+ cache byte
+ available byte
+}
+
+// NewWriter create bit writer
+func NewWriter(buffer *bytes.Buffer) *Writer {
+ var bw Writer
+ bw.Reset(buffer)
+ return &bw
+}
+
+// Reset writes to a new writer
+func (w *Writer) Reset(buffer *bytes.Buffer) {
+ w.out = buffer
+ w.cache = 0
+ w.available = 8
+}
+
+// WriteBool writes a boolean value
+// true: 1
+// false: 0
+func (w *Writer) WriteBool(b bool) {
+ if b {
+ w.cache |= 1 << (w.available - 1)
+ }
+
+ w.available--
+
+ if w.available == 0 {
+ // WriteByte never returns error
+ _ = w.out.WriteByte(w.cache)
+ w.cache = 0
+ w.available = 8
+ }
+}
+
+// WriteBits writes number of bits
+func (w *Writer) WriteBits(u uint64, numBits int) {
+ u <<= 64 - uint(numBits)
+
+ for ; numBits >= 8; numBits -= 8 {
+ byt := byte(u >> 56)
+ w.WriteByte(byt)
+ u <<= 8
+ }
+
+ remainder := byte(u >> 56)
+ for ; numBits > 0; numBits-- {
+ w.WriteBool((remainder & 0x80) != 0)
+ remainder <<= 1
+ }
+}
+
+// WriteByte write a byte
+func (w *Writer) WriteByte(b byte) {
+ _ = w.out.WriteByte(w.cache | (b >> (8 - w.available)))
+ w.cache = b << w.available
+}
+
+// Flush flushes the currently in-process byte
+func (w *Writer) Flush() {
+ if w.available != 8 {
+ _ = w.out.WriteByte(w.cache)
+ }
+ w.Reset(w.out)
+}
diff --git a/pkg/bytes/bytes.go b/pkg/bit/writer_test.go
similarity index 57%
rename from pkg/bytes/bytes.go
rename to pkg/bit/writer_test.go
index b4d051c..7624ff8 100644
--- a/pkg/bytes/bytes.go
+++ b/pkg/bit/writer_test.go
@@ -15,17 +15,41 @@
// specific language governing permissions and limitations
// under the License.
-package bytes
-
-func Join(s ...[]byte) []byte {
- n := 0
- for _, v := range s {
- n += len(v)
- }
-
- b, i := make([]byte, n), 0
- for _, v := range s {
- i += copy(b[i:], v)
- }
- return b
+package bit
+
+import (
+ "bytes"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestWriter(t *testing.T) {
+ out := &bytes.Buffer{}
+ w := NewWriter(out)
+
+ a := assert.New(t)
+
+ w.WriteByte(0xc1)
+ w.WriteBool(false)
+ w.WriteBits(0x3f, 6)
+ w.WriteBool(true)
+ w.WriteByte(0xac)
+ w.WriteBits(0x01, 1)
+ w.WriteBits(0x1248f, 20)
+ w.Flush()
+
+ w.WriteByte(0x01)
+ w.WriteByte(0x02)
+
+ w.WriteBits(0x0f, 4)
+
+ w.WriteByte(0x80)
+ w.WriteByte(0x8f)
+ w.Flush()
+
+ w.WriteBits(0x01, 1)
+ w.WriteByte(0xff)
+ w.Flush()
+ a.Equal([]byte{0xc1, 0x7f, 0xac, 0x89, 0x24, 0x78, 0x01, 0x02, 0xf8, 0x08, 0xf0, 0xff, 0x80}, out.Bytes())
}
diff --git a/pkg/buffer/writer.go b/pkg/buffer/writer.go
new file mode 100644
index 0000000..e109d25
--- /dev/null
+++ b/pkg/buffer/writer.go
@@ -0,0 +1,72 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package buffer
+
+import (
+ "bytes"
+ "encoding/binary"
+)
+
+// Writer writes data into a buffer
+type Writer struct {
+ buf *bytes.Buffer
+
+ scratch [binary.MaxVarintLen64]byte
+}
+
+func NewBufferWriter(buf *bytes.Buffer) *Writer {
+ return &Writer{
+ buf: buf,
+ }
+}
+
+func (w *Writer) Write(p []byte) {
+ _, _ = w.buf.Write(p)
+}
+
+func (w *Writer) WriteTo(other *Writer) (n int64) {
+ n, _ = w.buf.WriteTo(other.buf)
+ return n
+}
+
+func (w *Writer) PutUint16(v uint16) {
+ binary.LittleEndian.PutUint16(w.scratch[:], v)
+ _, _ = w.buf.Write(w.scratch[:2])
+}
+
+func (w *Writer) PutUint32(v uint32) {
+ binary.LittleEndian.PutUint32(w.scratch[:], v)
+ _, _ = w.buf.Write(w.scratch[:4])
+}
+
+func (w *Writer) PutUint64(v uint64) {
+ binary.LittleEndian.PutUint64(w.scratch[:], v)
+ _, _ = w.buf.Write(w.scratch[:8])
+}
+
+func (w *Writer) Reset() {
+ w.buf.Reset()
+}
+
+func (w *Writer) Len() int {
+ return w.buf.Len()
+}
+
+func (w *Writer) Bytes() []byte {
+ return w.buf.Bytes()
+}
diff --git a/pkg/encoding/encoding.go b/pkg/encoding/encoding.go
index 5e04e72..05a1f14 100644
--- a/pkg/encoding/encoding.go
+++ b/pkg/encoding/encoding.go
@@ -21,7 +21,10 @@ import "github.com/pkg/errors"
var ErrEncodeEmpty = errors.New("encode an empty value")
-type SeriesEncoderFactory func() SeriesEncoder
+type SeriesEncoderPool interface {
+ Get(metadata []byte) SeriesEncoder
+ Put(encoder SeriesEncoder)
+}
// SeriesEncoder encodes time series data point
type SeriesEncoder interface {
@@ -30,19 +33,22 @@ type SeriesEncoder interface {
// IsFull returns whether the encoded data reached its capacity
IsFull() bool
// Reset the underlying buffer
- Reset()
+ Reset(key []byte)
// Encode the time series data point to a binary
Encode() ([]byte, error)
// StartTime indicates the first entry's time
StartTime() uint64
}
-type SeriesDecoderFactory func() SeriesDecoder
+type SeriesDecoderPool interface {
+ Get(metadata []byte) SeriesDecoder
+ Put(encoder SeriesDecoder)
+}
// SeriesDecoder decodes encoded time series data
type SeriesDecoder interface {
// Decode the time series data
- Decode(data []byte) error
+ Decode(key, data []byte) error
// Len denotes the size of iterator
Len() int
// IsFull returns whether the encoded data reached its capacity
diff --git a/pkg/encoding/int.go b/pkg/encoding/int.go
new file mode 100644
index 0000000..9571ec4
--- /dev/null
+++ b/pkg/encoding/int.go
@@ -0,0 +1,195 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+ "bytes"
+ "encoding/binary"
+ "time"
+
+ "github.com/apache/skywalking-banyandb/pkg/bit"
+ "github.com/apache/skywalking-banyandb/pkg/buffer"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+var (
+ _ SeriesEncoder = (*intEncoder)(nil)
+)
+
+type ParseInterval = func(key []byte) time.Duration
+
+type intEncoder struct {
+ buff *bytes.Buffer
+ bw *bit.Writer
+ values *XOREncoder
+ fn ParseInterval
+ interval time.Duration
+ startTime uint64
+ num int
+ size int
+}
+
+func NewIntEncoder(size int, fn ParseInterval) SeriesEncoder {
+ buff := &bytes.Buffer{}
+ bw := bit.NewWriter(buff)
+ return &intEncoder{
+ buff: buff,
+ bw: bw,
+ values: NewXOREncoder(bw),
+ fn: fn,
+ size: size,
+ }
+}
+
+func (ie *intEncoder) Append(ts uint64, value []byte) {
+ if len(value) > 8 {
+ return
+ }
+ if ie.startTime == 0 {
+ ie.startTime = ts
+ }
+ gap := int(ts) - int(ie.startTime)
+ if gap < 0 {
+ return
+ }
+ zeroNum := gap/int(ie.interval) - 1
+ for i := 0; i < zeroNum; i++ {
+ ie.bw.WriteBool(false)
+ ie.num++
+ }
+ ie.bw.WriteBool(true)
+ ie.values.Write(binary.LittleEndian.Uint64(value))
+ ie.num++
+}
+
+func (ie *intEncoder) IsFull() bool {
+ return ie.num >= ie.size
+}
+
+func (ie *intEncoder) Reset(key []byte) {
+ ie.bw.Reset(nil)
+ ie.interval = ie.fn(key)
+}
+
+func (ie *intEncoder) Encode() ([]byte, error) {
+ ie.bw.Flush()
+ buffWriter := buffer.NewBufferWriter(ie.buff)
+ buffWriter.PutUint64(ie.startTime)
+ buffWriter.PutUint16(uint16(ie.size))
+ return ie.buff.Bytes(), nil
+}
+
+func (ie *intEncoder) StartTime() uint64 {
+ return ie.startTime
+}
+
+var _ SeriesDecoder = (*intDecoder)(nil)
+
+type intDecoder struct {
+ fn ParseInterval
+ size int
+ interval time.Duration
+ startTime uint64
+ num int
+ area []byte
+}
+
+func NewIntDecoder(size int, fn ParseInterval) SeriesDecoder {
+ return &intDecoder{
+ fn: fn,
+ size: size,
+ }
+}
+
+func (i intDecoder) Decode(key, data []byte) error {
+ i.interval = i.fn(key)
+ i.startTime = binary.LittleEndian.Uint64(data[len(data)-10 : len(data)-2])
+ i.num = int(binary.LittleEndian.Uint16(data[len(data)-2:]))
+ i.area = data[:len(data)-10]
+ return nil
+}
+
+func (i intDecoder) Len() int {
+ return i.num
+}
+
+func (i intDecoder) IsFull() bool {
+ return i.num >= i.size
+}
+
+func (i intDecoder) Get(ts uint64) ([]byte, error) {
+ for iter := i.Iterator(); iter.Next(); {
+ if iter.Time() == ts {
+ return iter.Val(), nil
+ }
+ }
+ return nil, nil
+}
+
+func (i intDecoder) Iterator() SeriesIterator {
+ br := bit.NewReader(bytes.NewReader(i.area))
+ return &intIterator{
+ startTime: i.startTime,
+ interval: int(i.interval),
+ br: br,
+ values: NewXORDecoder(br),
+ }
+}
+
+var _ SeriesIterator = (*intIterator)(nil)
+
+type intIterator struct {
+ startTime uint64
+ interval int
+ br *bit.Reader
+ values *XORDecoder
+
+ currVal uint64
+ currTime uint64
+ index int
+ err error
+}
+
+func (i *intIterator) Next() bool {
+ var b bool
+ b, i.err = i.br.ReadBool()
+ if i.err != nil {
+ return false
+ }
+ if b {
+ if i.values.Next() {
+ i.currVal = i.values.Value()
+ }
+ }
+ i.currVal = 0
+ i.currTime = i.startTime + uint64(i.interval*i.index)
+ i.index++
+ return true
+}
+
+func (i *intIterator) Val() []byte {
+ return convert.Uint64ToBytes(i.currVal)
+}
+
+func (i *intIterator) Time() uint64 {
+ return i.currTime
+}
+
+func (i *intIterator) Error() error {
+ return i.err
+}
diff --git a/pkg/encoding/stream_chunk.go b/pkg/encoding/plain.go
similarity index 54%
rename from pkg/encoding/stream_chunk.go
rename to pkg/encoding/plain.go
index 7ff5094..0b3fb8c 100644
--- a/pkg/encoding/stream_chunk.go
+++ b/pkg/encoding/plain.go
@@ -22,106 +22,147 @@ import (
"encoding/binary"
"fmt"
"sort"
+ "sync"
"github.com/klauspost/compress/zstd"
"github.com/pkg/errors"
+
+ "github.com/apache/skywalking-banyandb/pkg/buffer"
)
var (
- decoder, _ = zstd.NewReader(nil)
- encoder, _ = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
- _ SeriesEncoder = (*streamChunkEncoder)(nil)
- _ SeriesDecoder = (*StreamChunkDecoder)(nil)
+ encoderPool = sync.Pool{
+ New: newPlainEncoder,
+ }
+ decoderPool = sync.Pool{
+ New: func() interface{} {
+ return &plainDecoder{}
+ },
+ }
+)
+
+type plainEncoderPool struct {
+ pool *sync.Pool
+ size int
+}
+
+func NewPlainEncoderPool(size int) SeriesEncoderPool {
+ return &plainEncoderPool{
+ pool: &encoderPool,
+ size: size,
+ }
+}
+
+func (b *plainEncoderPool) Get(metadata []byte) SeriesEncoder {
+ encoder := b.pool.Get().(*plainEncoder)
+ encoder.Reset(metadata)
+ encoder.valueSize = b.size
+ return encoder
+}
+
+func (b *plainEncoderPool) Put(encoder SeriesEncoder) {
+ b.pool.Put(encoder)
+}
+
+type plainDecoderPool struct {
+ pool *sync.Pool
+ size int
+}
+
+func NewPlainDecoderPool(size int) SeriesDecoderPool {
+ return &plainDecoderPool{
+ pool: &decoderPool,
+ size: size,
+ }
+}
+
+func (b *plainDecoderPool) Get(_ []byte) SeriesDecoder {
+ decoder := b.pool.Get().(*plainDecoder)
+ decoder.valueSize = b.size
+ return decoder
+}
+
+func (b *plainDecoderPool) Put(decoder SeriesDecoder) {
+ b.pool.Put(decoder)
+}
+
+var (
+ zstdDecoder, _ = zstd.NewReader(nil)
+ zstdEncoder, _ = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
+ _ SeriesEncoder = (*plainEncoder)(nil)
+ _ SeriesDecoder = (*plainDecoder)(nil)
)
-//streamChunkEncoder backport to reduced value
-type streamChunkEncoder struct {
- tsBuff bytes.Buffer
- valBuff bytes.Buffer
- scratch [binary.MaxVarintLen64]byte
+//plainEncoder backport to reduced value
+type plainEncoder struct {
+ tsBuff *buffer.Writer
+ valBuff *buffer.Writer
len uint32
num uint32
startTime uint64
valueSize int
}
-func NewStreamChunkEncoder(size int) SeriesEncoder {
- return &streamChunkEncoder{
- valueSize: size,
+func newPlainEncoder() interface{} {
+ return &plainEncoder{
+ tsBuff: buffer.NewBufferWriter(&bytes.Buffer{}),
+ valBuff: buffer.NewBufferWriter(&bytes.Buffer{}),
}
}
-func (t *streamChunkEncoder) Append(ts uint64, value []byte) {
+func (t *plainEncoder) Append(ts uint64, value []byte) {
if t.startTime == 0 {
t.startTime = ts
} else if t.startTime > ts {
t.startTime = ts
}
vLen := len(value)
- offset := uint32(len(t.valBuff.Bytes()))
- t.valBuff.Write(t.putUint32(uint32(vLen)))
+ offset := uint32(t.valBuff.Len())
+ t.valBuff.PutUint32(uint32(vLen))
t.valBuff.Write(value)
- t.tsBuff.Write(t.putUint64(ts))
- t.tsBuff.Write(t.putUint32(offset))
- t.num = t.num + 1
+ t.tsBuff.PutUint64(ts)
+ t.tsBuff.PutUint32(offset)
+ t.num++
}
-func (t *streamChunkEncoder) IsFull() bool {
+func (t *plainEncoder) IsFull() bool {
return t.valBuff.Len() >= t.valueSize
}
-func (t *streamChunkEncoder) Reset() {
+func (t *plainEncoder) Reset(_ []byte) {
t.tsBuff.Reset()
t.valBuff.Reset()
t.num = 0
t.startTime = 0
}
-func (t *streamChunkEncoder) Encode() ([]byte, error) {
+func (t *plainEncoder) Encode() ([]byte, error) {
if t.tsBuff.Len() < 1 {
return nil, ErrEncodeEmpty
}
val := t.valBuff.Bytes()
t.len = uint32(len(val))
- _, err := t.tsBuff.WriteTo(&t.valBuff)
- if err != nil {
- return nil, err
- }
- t.valBuff.Write(t.putUint32(t.num))
- t.valBuff.Write(t.putUint32(t.len))
+ t.tsBuff.WriteTo(t.valBuff)
+ t.valBuff.PutUint32(t.num)
+ t.valBuff.PutUint32(t.len)
data := t.valBuff.Bytes()
l := len(data)
dst := make([]byte, 0, compressBound(l))
- dst = encoder.EncodeAll(data, dst)
- result := make([]byte, len(dst)+2)
- copy(result, dst)
- copy(result[len(dst):], t.putUint16(uint16(l)))
- return result, nil
+ dst = zstdEncoder.EncodeAll(data, dst)
+ result := buffer.NewBufferWriter(bytes.NewBuffer(make([]byte, len(dst)+2)))
+ result.Write(dst)
+ result.PutUint16(uint16(l))
+ return result.Bytes(), nil
}
func compressBound(srcSize int) int {
return srcSize + (srcSize >> 8)
}
-func (t *streamChunkEncoder) StartTime() uint64 {
+func (t *plainEncoder) StartTime() uint64 {
return t.startTime
}
-func (t *streamChunkEncoder) putUint16(v uint16) []byte {
- binary.LittleEndian.PutUint16(t.scratch[:], v)
- return t.scratch[:2]
-}
-
-func (t *streamChunkEncoder) putUint32(v uint32) []byte {
- binary.LittleEndian.PutUint32(t.scratch[:], v)
- return t.scratch[:4]
-}
-
-func (t *streamChunkEncoder) putUint64(v uint64) []byte {
- binary.LittleEndian.PutUint64(t.scratch[:], v)
- return t.scratch[:8]
-}
-
const (
// TsLen equals ts(uint64) + data_offset(uint32)
TsLen = 8 + 4
@@ -129,8 +170,8 @@ const (
var ErrInvalidValue = errors.New("invalid encoded value")
-//StreamChunkDecoder decodes encoded time index
-type StreamChunkDecoder struct {
+//plainDecoder decodes encoded time index
+type plainDecoder struct {
ts []byte
val []byte
len uint32
@@ -138,20 +179,14 @@ type StreamChunkDecoder struct {
valueSize int
}
-func NewStreamChunkDecoder(size int) SeriesDecoder {
- return &StreamChunkDecoder{
- valueSize: size,
- }
-}
-
-func (t *StreamChunkDecoder) Len() int {
+func (t *plainDecoder) Len() int {
return int(t.num)
}
-func (t *StreamChunkDecoder) Decode(rawData []byte) (err error) {
+func (t *plainDecoder) Decode(_, rawData []byte) (err error) {
var data []byte
size := binary.LittleEndian.Uint16(rawData[len(rawData)-2:])
- if data, err = decoder.DecodeAll(rawData[:len(rawData)-2], make([]byte, 0, size)); err != nil {
+ if data, err = zstdDecoder.DecodeAll(rawData[:len(rawData)-2], make([]byte, 0, size)); err != nil {
return err
}
l := uint32(len(data))
@@ -170,11 +205,11 @@ func (t *StreamChunkDecoder) Decode(rawData []byte) (err error) {
return nil
}
-func (t *StreamChunkDecoder) IsFull() bool {
+func (t *plainDecoder) IsFull() bool {
return int(t.len) >= t.valueSize
}
-func (t *StreamChunkDecoder) Get(ts uint64) ([]byte, error) {
+func (t *plainDecoder) Get(ts uint64) ([]byte, error) {
i := sort.Search(int(t.num), func(i int) bool {
slot := getTSSlot(t.ts, i)
return parseTS(slot) <= ts
@@ -189,7 +224,7 @@ func (t *StreamChunkDecoder) Get(ts uint64) ([]byte, error) {
return getVal(t.val, parseOffset(slot))
}
-func (t *StreamChunkDecoder) Iterator() SeriesIterator {
+func (t *plainDecoder) Iterator() SeriesIterator {
return newBlockItemIterator(t)
}
@@ -213,17 +248,17 @@ func parseOffset(tsSlot []byte) uint32 {
return binary.LittleEndian.Uint32(tsSlot[8:])
}
-var _ SeriesIterator = (*chunkIterator)(nil)
+var _ SeriesIterator = (*plainIterator)(nil)
-type chunkIterator struct {
+type plainIterator struct {
index []byte
data []byte
idx int
num int
}
-func newBlockItemIterator(decoder *StreamChunkDecoder) SeriesIterator {
- return &chunkIterator{
+func newBlockItemIterator(decoder *plainDecoder) SeriesIterator {
+ return &plainIterator{
idx: -1,
index: decoder.ts,
data: decoder.val,
@@ -231,20 +266,20 @@ func newBlockItemIterator(decoder *StreamChunkDecoder) SeriesIterator {
}
}
-func (b *chunkIterator) Next() bool {
+func (b *plainIterator) Next() bool {
b.idx++
return b.idx >= 0 && b.idx < b.num
}
-func (b *chunkIterator) Val() []byte {
+func (b *plainIterator) Val() []byte {
v, _ := getVal(b.data, parseOffset(getTSSlot(b.index, b.idx)))
return v
}
-func (b *chunkIterator) Time() uint64 {
+func (b *plainIterator) Time() uint64 {
return parseTS(getTSSlot(b.index, b.idx))
}
-func (b *chunkIterator) Error() error {
+func (b *plainIterator) Error() error {
return nil
}
diff --git a/pkg/encoding/xor.go b/pkg/encoding/xor.go
new file mode 100644
index 0000000..43203ca
--- /dev/null
+++ b/pkg/encoding/xor.go
@@ -0,0 +1,177 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+ "math/bits"
+
+ "github.com/apache/skywalking-banyandb/pkg/bit"
+)
+
+const (
+ ctrlBitsNoContainMeaningful = 0x2
+ ctrlBitsContainMeaningful = 0x3
+)
+
+// XOREncoder intends to compress uint64 data
+// https://www.vldb.org/pvldb/vol8/p1816-teller.pdf
+type XOREncoder struct {
+ bw *bit.Writer
+ preVal uint64
+ leading int
+ trailing int
+
+ first bool
+}
+
+// NewXOREncoder creates xor zstdEncoder for compressing uint64 data
+func NewXOREncoder(bw *bit.Writer) *XOREncoder {
+ return &XOREncoder{
+ bw: bw,
+ first: true,
+ }
+}
+
+func (e *XOREncoder) Write(val uint64) {
+ if e.first {
+ e.first = false
+ e.preVal = val
+ e.bw.WriteBits(val, 64)
+ return
+ }
+
+ delta := val ^ e.preVal
+ e.preVal = val
+ if delta == 0 {
+ e.bw.WriteBool(false)
+ return
+ }
+
+ leading := bits.LeadingZeros64(delta)
+ trailing := bits.TrailingZeros64(delta)
+ if leading >= e.leading && trailing >= e.trailing {
+ // write control '10' to reuse previous block meaningful bits
+ e.bw.WriteBits(ctrlBitsNoContainMeaningful, 2)
+ e.bw.WriteBits(delta>>uint(e.trailing), 64-e.leading-e.trailing)
+ } else {
+ // write control '11' to create a new block meaningful bits
+ e.bw.WriteBits(ctrlBitsContainMeaningful, 2)
+ meaningfulLen := 64 - leading - trailing
+ e.bw.WriteBits(uint64(leading), 6)
+ // meaningfulLen is at least 1, so we can subtract 1 from it and encode it in 6 bits
+ e.bw.WriteBits(uint64(meaningfulLen-1), 6)
+ e.bw.WriteBits(delta>>uint(trailing), meaningfulLen)
+
+ e.leading = leading
+ e.trailing = trailing
+ }
+}
+
+// XORDecoder decodes buffer to uint64 values using xor compress
+type XORDecoder struct {
+ val uint64
+
+ br *bit.Reader
+
+ leading uint64
+ trailing uint64
+
+ first bool
+ err error
+}
+
+// NewXORDecoder create zstdDecoder decompress buffer using xor
+func NewXORDecoder(br *bit.Reader) *XORDecoder {
+ s := &XORDecoder{
+ br: br,
+ first: true,
+ }
+ return s
+}
+
+// Reset resets the underlying buffer to decode
+func (d *XORDecoder) Reset() {
+ d.first = true
+ d.leading = 0
+ d.trailing = 0
+ d.val = 0
+}
+
+// Next return if zstdDecoder has value in buffer using xor, do uncompress logic in next method,
+// data format reference zstdEncoder format
+func (d *XORDecoder) Next() bool {
+ if d.first {
+ // read first value
+ d.first = false
+ d.val, d.err = d.br.ReadBits(64)
+ return d.err == nil
+ }
+
+ var b bool
+ // read delta control bit
+ b, d.err = d.br.ReadBool()
+ if d.err != nil {
+ return false
+ }
+ if !b {
+ return true
+ }
+ ctrlBits := ctrlBitsNoContainMeaningful
+ // read control bit
+ b, d.err = d.br.ReadBool()
+ if d.err != nil {
+ return false
+ }
+ if b {
+ ctrlBits |= 1
+ }
+ var blockSize uint64
+ if ctrlBits == ctrlBitsNoContainMeaningful {
+ blockSize = 64 - d.leading - d.trailing
+ } else {
+ // read leading and trailing, because block is diff with previous
+ d.leading, d.err = d.br.ReadBits(6)
+ if d.err != nil {
+ return false
+ }
+ blockSize, d.err = d.br.ReadBits(6)
+ if d.err != nil {
+ return false
+ }
+ blockSize++
+ d.trailing = 64 - d.leading - blockSize
+ }
+ delta, err := d.br.ReadBits(int(blockSize))
+ if err != nil {
+ d.err = err
+ return false
+ }
+ val := delta << d.trailing
+ d.val ^= val
+ return true
+}
+
+// Value returns uint64 from buffer
+func (d *XORDecoder) Value() uint64 {
+ return d.val
+}
+
+// Err returns error raised in Next()
+func (d *XORDecoder) Err() error {
+ return d.err
+}
diff --git a/pkg/encoding/xor_test.go b/pkg/encoding/xor_test.go
new file mode 100644
index 0000000..282ab57
--- /dev/null
+++ b/pkg/encoding/xor_test.go
@@ -0,0 +1,60 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+ "bytes"
+ "io"
+ "testing"
+
+ "github.com/pkg/errors"
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/skywalking-banyandb/pkg/bit"
+)
+
+func TestXOR(t *testing.T) {
+ var buf bytes.Buffer
+ bitWriter := bit.NewWriter(&buf)
+ e := NewXOREncoder(bitWriter)
+ e.Write(uint64(76))
+ e.Write(uint64(50))
+ e.Write(uint64(50))
+ e.Write(uint64(999999999))
+ e.Write(uint64(100))
+
+ bitWriter.Flush()
+ data := buf.Bytes()
+
+ reader := bit.NewReader(bytes.NewReader(data))
+ d := NewXORDecoder(reader)
+ a := assert.New(t)
+ verify(d, a, uint64(76))
+ verify(d, a, uint64(50))
+ verify(d, a, uint64(50))
+ verify(d, a, uint64(999999999))
+ verify(d, a, uint64(100))
+}
+
+func verify(d *XORDecoder, a *assert.Assertions, except uint64) {
+ a.True(d.Next())
+ if d.Err() != nil && !errors.Is(d.Err(), io.EOF) {
+ a.Fail("error: %v", d.Err())
+ }
+ a.Equal(except, d.Value())
+}